Repository: flink
Updated Branches:
  refs/heads/master 718a17b28 -> 4f12356eb


http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index c804830..47255fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -27,9 +27,7 @@ import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -185,7 +183,6 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                                1,
                                1L,
                                AkkaUtils.getDefaultTimeout(),
-                               StreamingMode.BATCH_ONLY,
                                leaderElectionService,
                                submittedJobGraphStore,
                                checkpointRecoveryFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 0b84474..c490a64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -68,7 +67,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
 
-               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false, 
StreamingMode.BATCH_ONLY);
+               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false);
                cluster.start(false); // TaskManagers don't have to register at 
the JobManager
 
                cluster.waitForActorsToBeAlive(); // we only wait until all 
actors are alive

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index c83f548..c8cf868 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
@@ -39,7 +38,6 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
 
        private final Configuration userConfiguration;
        private final boolean useSingleActorSystem;
-       private final StreamingMode streamingMode;
 
        public List<TestingLeaderElectionService> leaderElectionServices;
        public List<TestingLeaderRetrievalService> leaderRetrievalServices;
@@ -49,13 +47,11 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
        public LeaderElectionRetrievalTestingCluster(
                        Configuration userConfiguration,
                        boolean singleActorSystem,
-                       boolean synchronousDispatcher,
-                       StreamingMode streamingMode) {
-               super(userConfiguration, singleActorSystem, 
synchronousDispatcher, streamingMode);
+                       boolean synchronousDispatcher) {
+               super(userConfiguration, singleActorSystem, 
synchronousDispatcher);
 
                this.userConfiguration = userConfiguration;
                this.useSingleActorSystem = singleActorSystem;
-               this.streamingMode = streamingMode;
 
                leaderElectionServices = new 
ArrayList<TestingLeaderElectionService>();
                leaderRetrievalServices = new 
ArrayList<TestingLeaderRetrievalService>();
@@ -67,11 +63,6 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
        }
 
        @Override
-       public StreamingMode streamingMode() {
-               return streamingMode;
-       }
-
-       @Override
        public boolean useSingleActorSystem() {
                return useSingleActorSystem;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index d30db9f..628d756 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -28,7 +28,6 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -76,7 +75,6 @@ public class TaskManagerComponentsStartupShutdownTest {
                        final ActorRef jobManager = 
JobManager.startJobManagerActors(
                                config,
                                actorSystem,
-                               StreamingMode.BATCH_ONLY,
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 1c15abf..efe0b32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -24,7 +24,6 @@ import akka.actor.PoisonPill;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -91,7 +90,6 @@ public class TaskManagerProcessReapingTest {
                        JobManager.startJobManagerActors(
                                new Configuration(),
                                jmActorSystem,
-                               StreamingMode.BATCH_ONLY,
                                JobManager.class,
                                MemoryArchivist.class);
 
@@ -208,7 +206,7 @@ public class TaskManagerProcessReapingTest {
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
 
-                               TaskManager.runTaskManager("localhost", 
taskManagerPort, cfg, StreamingMode.BATCH_ONLY);
+                               TaskManager.runTaskManager("localhost", 
taskManagerPort, cfg);
 
                                // wait forever
                                Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 130dcc5..1f328d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -25,7 +25,6 @@ import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -475,7 +474,6 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                                                        NONE_STRING, // no 
actor name -> random
                                                        new 
Some<LeaderRetrievalService>(new 
StandaloneLeaderRetrievalService(jobManager.path().toString())),
                                                        false, // init network 
stack !!!
-                                                       
StreamingMode.BATCH_ONLY,
                                                        TaskManager.class);
 
                                        watch(taskManager);
@@ -597,7 +595,6 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                        actorSystem,
                        NONE_STRING,
                        NONE_STRING,
-                       StreamingMode.BATCH_ONLY,
                        JobManager.class,
                        MemoryArchivist.class)._1();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 9cc8170..ac8eadf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -24,7 +24,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.StreamingMode;
 import org.junit.Test;
 
 import java.io.File;
@@ -54,8 +53,11 @@ public class TaskManagerStartupTest {
                        final int port = blocker.getLocalPort();
 
                        try {
-                               TaskManager.runTaskManager(localHostName, port, 
new Configuration(),
-                                                                               
        StreamingMode.BATCH_ONLY, TaskManager.class);
+                               TaskManager.runTaskManager(
+                                       localHostName,
+                                       port,
+                                       new Configuration(),
+                                       TaskManager.class);
                                fail("This should fail with an IOException");
                        }
                        catch (IOException e) {
@@ -103,7 +105,7 @@ public class TaskManagerStartupTest {
                        
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
 
                        try {
-                               TaskManager.runTaskManager("localhost", 0, cfg, 
StreamingMode.BATCH_ONLY);
+                               TaskManager.runTaskManager("localhost", 0, cfg);
                                fail("Should fail synchronously with an 
exception");
                        }
                        catch (IOException e) {
@@ -136,11 +138,12 @@ public class TaskManagerStartupTest {
                        Configuration cfg = new Configuration();
                        
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
                        
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+                       
cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
 
                        // something invalid
                        
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
                        try {
-                               TaskManager.runTaskManager("localhost", 0, cfg, 
StreamingMode.BATCH_ONLY);
+                               TaskManager.runTaskManager("localhost", 0, cfg);
                                fail("Should fail synchronously with an 
exception");
                        }
                        catch (IllegalConfigurationException e) {
@@ -152,7 +155,7 @@ public class TaskManagerStartupTest {
                                                                        
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20;
                        
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
                        try {
-                               TaskManager.runTaskManager("localhost", 0, cfg, 
StreamingMode.BATCH_ONLY);
+                               TaskManager.runTaskManager("localhost", 0, cfg);
                                fail("Should fail synchronously with an 
exception");
                        }
                        catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 0641493..45be1aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -22,7 +22,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.JobManagerMode;
@@ -190,8 +189,7 @@ public class JobManagerProcess extends TestJvmProcess {
                private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcessEntryPoint.class);
 
                /**
-                * Runs the JobManager process in {@link 
JobManagerMode#CLUSTER} and {@link
-                * StreamingMode#STREAMING} (can handle both batch and 
streaming jobs).
+                * Runs the JobManager process in {@link 
JobManagerMode#CLUSTER}.
                 *
                 * <p><strong>Required argument</strong>: <code>port</code>. 
Start the process with
                 * <code>--port PORT</code>.
@@ -210,8 +208,7 @@ public class JobManagerProcess extends TestJvmProcess {
                                LOG.info("Configuration: {}.", config);
 
                                // Run the JobManager
-                               JobManager.runJobManager(config, 
JobManagerMode.CLUSTER, StreamingMode.STREAMING,
-                                               "localhost", port);
+                               JobManager.runJobManager(config, 
JobManagerMode.CLUSTER, "localhost", port);
 
                                // Run forever. Forever, ever? Forever, ever!
                                new CountDownLatch(1).await();

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 86449a8..392ccf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,10 +94,7 @@ public class TaskManagerProcess extends TestJvmProcess {
                private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
 
                /**
-                * Runs the JobManager process in {@link 
StreamingMode#STREAMING} (can handle both batch
-                * and streaming jobs).
-                *
-                * <p>All arguments are parsed to a {@link Configuration} and 
passed to the Taskmanager,
+                * All arguments are parsed to a {@link Configuration} and 
passed to the Taskmanager,
                 * for instance: <code>--recovery.mode ZOOKEEPER 
--recovery.zookeeper.quorum "xyz:123:456"</code>.
                 */
                public static void main(String[] args) throws Exception {
@@ -117,8 +113,7 @@ public class TaskManagerProcess extends TestJvmProcess {
                                LOG.info("Configuration: {}.", config);
 
                                // Run the TaskManager
-                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(
-                                               config, 
StreamingMode.STREAMING, TaskManager.class);
+                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(config, TaskManager.class);
 
                                // Run forever
                                new CountDownLatch(1).await();

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index fa3fc8b..b96411e 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -24,7 +24,6 @@ import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit}
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance._
 import 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
@@ -162,7 +161,6 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       _system,
       None,
       None,
-      StreamingMode.BATCH_ONLY,
       classOf[JobManager],
       classOf[MemoryArchivist])
     new AkkaActorGateway(jm, null)

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 703d7bf..5eee4e5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -24,7 +24,6 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
@@ -45,21 +44,13 @@ import scala.concurrent.{Await, Future}
 class TestingCluster(
                       userConfiguration: Configuration,
                       singleActorSystem: Boolean,
-                      synchronousDispatcher: Boolean,
-                      streamingMode: StreamingMode)
+                      synchronousDispatcher: Boolean)
   extends FlinkMiniCluster(
     userConfiguration,
-    singleActorSystem,
-    streamingMode) {
+    singleActorSystem) {
 
-
-  def this(userConfiguration: Configuration,
-           singleActorSystem: Boolean,
-           synchronousDispatcher: Boolean)
-  = this(userConfiguration, singleActorSystem, synchronousDispatcher, 
StreamingMode.BATCH_ONLY)
-
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
-  = this(userConfiguration, singleActorSystem, false)
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) =
+    this(userConfiguration, singleActorSystem, false)
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true, 
false)
 
@@ -127,7 +118,6 @@ class TestingCluster(
         executionRetries,
         delayBetweenRetries,
         timeout,
-        streamingMode,
         leaderElectionService,
         submittedJobsGraphs,
         checkpointRecoveryFactory))
@@ -153,7 +143,6 @@ class TestingCluster(
       Some(tmActorName),
       Some(createLeaderRetrievalService),
       numTaskManagers == 1,
-      streamingMode,
       classOf[TestingTaskManager])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index be72003..7cbff48 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -43,7 +42,6 @@ import scala.language.postfixOps
   * @param defaultExecutionRetries
   * @param delayBetweenRetries
   * @param timeout
-  * @param mode
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
@@ -55,7 +53,6 @@ class TestingJobManager(
     defaultExecutionRetries: Int,
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
-    mode: StreamingMode,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
@@ -69,7 +66,6 @@ class TestingJobManager(
     defaultExecutionRetries,
     delayBetweenRetries,
     timeout,
-    mode,
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory)

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 553b686..02b0cf8 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -29,10 +29,9 @@ import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
-import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, 
FlinkActor, StreamingMode}
+import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, 
FlinkActor}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -223,7 +222,6 @@ object TestingUtils {
       None,
       leaderRetrievalService,
       useLocalCommunication,
-      StreamingMode.BATCH_ONLY,
       classOf[TestingTaskManager]
     )
 
@@ -274,7 +272,6 @@ object TestingUtils {
         actorSystem,
         Some(JobManager.JOB_MANAGER_NAME),
         Some(JobManager.ARCHIVE_NAME),
-        StreamingMode.BATCH_ONLY,
         classOf[JobManager],
         classOf[MemoryArchivist])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index de2f3ec..00e36ab 100644
--- 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.scala
 import java.io._
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
 import org.apache.flink.util.TestLogger
 import org.junit.{AfterClass, BeforeClass, Test, Assert}
@@ -261,7 +260,6 @@ object ScalaShellITCase {
     val cl = TestBaseUtils.startCluster(
       1,
       parallelism,
-      StreamingMode.BATCH_ONLY,
       false,
       false,
       false)

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
index 6186a47..eda9d1a 100644
--- 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.tez.test;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.tez.client.LocalTezEnvironment;
 import org.junit.Assert;
@@ -41,7 +40,7 @@ public abstract class TezProgramTestBase extends 
AbstractTestBase {
     }
 
     public TezProgramTestBase(Configuration config) {
-        super (config, StreamingMode.BATCH_ONLY);
+        super (config);
     }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index e6e179c..3550bd9 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -31,7 +31,6 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
@@ -185,7 +184,7 @@ public abstract class KafkaTestBase extends TestLogger {
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                
flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
 
-               flink = new ForkableFlinkMiniCluster(flinkConfig, false, 
StreamingMode.STREAMING);
+               flink = new ForkableFlinkMiniCluster(flinkConfig, false);
                flink.start();
 
                flinkPort = flink.getLeaderRPCPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index e76a2c0..6ad7352 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
@@ -100,7 +99,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
                        LOG.info("Running job on local embedded Flink mini 
cluster");
                }
 
-               LocalFlinkMiniCluster exec = new 
LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
+               LocalFlinkMiniCluster exec = new 
LocalFlinkMiniCluster(configuration, true);
                try {
                        exec.start();
                        return exec.submitJobAndWait(jobGraph, 
getConfig().isSysoutLoggingEnabled());

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 4e02f2c..6cd2d41 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
@@ -69,7 +68,7 @@ public class StreamingMultipleProgramsTestBase extends 
TestBaseUtils {
 
        @BeforeClass
        public static void setup() throws Exception {
-               cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, 
StreamingMode.STREAMING, false, false, true);
+               cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, 
false, false, true);
                TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index ce3aa86..50ed1cf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Test;
@@ -34,7 +33,7 @@ public abstract class StreamingProgramTestBase extends 
AbstractTestBase {
        
        
        public StreamingProgramTestBase() {
-               super(new Configuration(), StreamingMode.STREAMING);
+               super(new Configuration());
                setParallelism(DEFAULT_PARALLELISM);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index 3342e1e..ee415d1 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.streaming.util.TestStreamEnvironment
 import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
 import org.scalatest.BeforeAndAfterAll
@@ -37,7 +36,6 @@ trait ScalaStreamingMultipleProgramsTestBase
       TestBaseUtils.startCluster(
         1,
         parallelism,
-        StreamingMode.STREAMING,
         false,
         false,
         true

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 005382a..c2da691 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.util;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -44,9 +43,6 @@ public abstract class AbstractTestBase extends TestBaseUtils {
        
        private final FiniteDuration timeout;
 
-       /** Mode (batch-only / streaming) in which to start the system */
-       private final StreamingMode streamingMode;
-
        protected int taskManagerNumSlots = 1;
 
        protected int numTaskManagers = 1;
@@ -55,9 +51,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
        protected ForkableFlinkMiniCluster executor;
        
 
-       public AbstractTestBase(Configuration config, StreamingMode 
streamingMode) {
+       public AbstractTestBase(Configuration config) {
                this.config = Objects.requireNonNull(config);
-               this.streamingMode = Objects.requireNonNull(streamingMode);
                this.tempFiles = new ArrayList<File>();
 
                timeout = AkkaUtils.getTimeout(config);
@@ -71,7 +66,6 @@ public abstract class AbstractTestBase extends TestBaseUtils {
                this.executor = startCluster(
                        numTaskManagers,
                        taskManagerNumSlots,
-                       streamingMode,
                        false,
                        false,
                        true);

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index f2de650..e639c80 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -22,7 +22,6 @@ import java.util.Comparator;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -51,7 +50,7 @@ public abstract class JavaProgramTestBase extends 
AbstractTestBase {
        }
        
        public JavaProgramTestBase(Configuration config) {
-               super(config, StreamingMode.BATCH_ONLY);
+               super(config);
                setTaskManagerNumSlots(parallelism);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 27710d7..38116e2 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.runtime.StreamingMode;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
@@ -104,7 +103,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils 
{
                cluster = TestBaseUtils.startCluster(
                        1,
                        DEFAULT_PARALLELISM,
-                       StreamingMode.BATCH_ONLY,
                        startWebServer,
                        false,
                        true);

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 608000d..2963418 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
@@ -104,7 +103,6 @@ public class TestBaseUtils extends TestLogger {
        public static ForkableFlinkMiniCluster startCluster(
                int numTaskManagers,
                int taskManagerNumSlots,
-               StreamingMode mode,
                boolean startWebserver,
                boolean startZooKeeper,
                boolean singleActorSystem) throws Exception {
@@ -121,12 +119,11 @@ public class TestBaseUtils extends TestLogger {
                        config.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
                }
 
-               return startCluster(config, mode, singleActorSystem);
+               return startCluster(config, singleActorSystem);
        }
 
        public static ForkableFlinkMiniCluster startCluster(
                Configuration config,
-               StreamingMode mode,
                boolean singleActorSystem) throws Exception {
 
                logDir = File.createTempFile("TestBaseUtils-logdir", null);
@@ -144,7 +141,7 @@ public class TestBaseUtils extends TestLogger {
                config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
8081);
                config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
 
-               ForkableFlinkMiniCluster cluster =  new 
ForkableFlinkMiniCluster(config, singleActorSystem, mode);
+               ForkableFlinkMiniCluster cluster =  new 
ForkableFlinkMiniCluster(config, singleActorSystem);
 
                cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 612fdc5..715bc55 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.util
 
-import org.apache.flink.runtime.StreamingMode
 import org.scalatest.{Suite, BeforeAndAfter}
 
 /** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala 
based tests.
@@ -57,7 +56,6 @@ trait FlinkTestBase extends BeforeAndAfter {
     val cl = TestBaseUtils.startCluster(
       1,
       parallelism,
-      StreamingMode.BATCH_ONLY,
       false,
       false,
       true)

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 11eb174..ed336d7 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -25,7 +25,6 @@ import akka.pattern.Patterns._
 import akka.pattern.ask
 import org.apache.curator.test.TestingCluster
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
@@ -45,13 +44,9 @@ import scala.concurrent.{Await, Future}
  *                          same [[ActorSystem]], otherwise false.
  */
 class ForkableFlinkMiniCluster(
-                                userConfiguration: Configuration,
-                                singleActorSystem: Boolean,
-                                streamingMode: StreamingMode)
-  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, 
streamingMode) {
-
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
-  = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean)
+  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
@@ -103,7 +98,6 @@ class ForkableFlinkMiniCluster(
       actorSystem,
       Some(jobManagerName),
       Some(archiveName),
-      streamingMode,
       classOf[TestingJobManager],
       classOf[TestingMemoryArchivist])
 
@@ -137,7 +131,6 @@ class ForkableFlinkMiniCluster(
       Some(TaskManager.TASK_MANAGER_NAME + index),
       Some(createLeaderRetrievalService()),
       localExecution,
-      streamingMode,
       classOf[TestingTaskManager])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 84022f0..cfef35f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -70,7 +69,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
                
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
 
-               cluster = new ForkableFlinkMiniCluster(config, false, 
StreamingMode.STREAMING);
+               cluster = new ForkableFlinkMiniCluster(config, false);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 975582b..2284456 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -71,7 +70,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
                
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
 
-               cluster = new ForkableFlinkMiniCluster(config, false, 
StreamingMode.STREAMING);
+               cluster = new ForkableFlinkMiniCluster(config, false);
                cluster.start();
        }
 
@@ -555,6 +554,22 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                public void open(Configuration parameters) throws Exception {
                        // this sink can only work with DOP 1
                        assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       // it can happen that a checkpoint happens when the 
complete success state is
+                       // already set. In that case we restart with the final 
state and would never
+                       // finish because no more elements arrive.
+                       if (windowCounts.size() == numKeys) {
+                               boolean seenAll = true;
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount != numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       }
+                               }
+                               if (seenAll) {
+                                       throw new SuccessException();
+                               }
+                       }
                }
 
                @Override
@@ -597,8 +612,8 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                windowCounts.put(value.f0, 1);
                        }
 
-                       boolean seenAll = true;
                        if (windowCounts.size() == numKeys) {
+                               boolean seenAll = true;
                                for (Integer windowCount: 
windowCounts.values()) {
                                        if (windowCount < numWindowsExpected) {
                                                seenAll = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index e297486..ce3c51a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -36,9 +35,6 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -85,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
                
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
 
-               cluster = new ForkableFlinkMiniCluster(config, false, 
StreamingMode.STREAMING);
+               cluster = new ForkableFlinkMiniCluster(config, false);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 971a173..7bdfc9d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -54,7 +53,7 @@ public class StreamingScalabilityAndLatency {
                        config.setInteger("taskmanager.net.server.numThreads", 
1);
                        config.setInteger("taskmanager.net.client.numThreads", 
1);
 
-                       cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.STREAMING);
+                       cluster = new LocalFlinkMiniCluster(config, false);
                        cluster.start();
                        
                        runPartitioningProgram(cluster.getLeaderRPCPort(), 
PARALLELISM);

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
index ba5ff1c..2f6b762 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -168,7 +167,7 @@ public abstract class 
AbstractJobManagerProcessFailureRecoveryITCase extends Tes
                                TaskManager.startTaskManagerComponentsAndActor(
                                                config, tmActorSystem[i], 
"localhost",
                                                Option.<String>empty(), 
Option.<LeaderRetrievalService>empty(),
-                                               false, StreamingMode.STREAMING, 
TaskManager.class);
+                                               false, TaskManager.class);
                        }
 
                        // Test actor system

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index c02fa6c..54505ac 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -26,7 +26,6 @@ import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -128,7 +127,6 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                jmConfig,
                                jmActorSystem,
-                               StreamingMode.STREAMING,
                                JobManager.class,
                                MemoryArchivist.class)._1();
 
@@ -379,7 +377,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
-                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, 
StreamingMode.STREAMING, TaskManager.class);
+                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
 
                                // wait forever
                                Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
index e590067..51a0765 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -177,7 +176,7 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                        TaskManager.startTaskManagerComponentsAndActor(
                                        config, taskManagerSystem, "localhost",
                                        Option.<String>empty(), 
Option.<LeaderRetrievalService>empty(),
-                                       false, StreamingMode.STREAMING, 
TaskManager.class);
+                                       false, TaskManager.class);
 
                        {
                                // Initial submission

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 6dce370..37e4e38 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -104,7 +103,6 @@ public class ProcessFailureCancelingITCase {
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                jmConfig,
                                jmActorSystem,
-                               StreamingMode.BATCH_ONLY,
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index b4a5018..7710f06 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
@@ -76,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger {
                        
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
                        
conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                        
-                       flink = new ForkableFlinkMiniCluster(conf, false, 
StreamingMode.BATCH_ONLY);
+                       flink = new ForkableFlinkMiniCluster(conf, false);
                        flink.start();
 
                        ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(addressString, 
flink.getLeaderRPCPort());

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index fa70039..3379325 100644
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -20,7 +20,6 @@ package org.apache.flink.yarn
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -61,7 +60,6 @@ class TestingYarnJobManager(
     defaultExecutionRetries: Int,
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
-    mode: StreamingMode,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
@@ -75,7 +73,6 @@ class TestingYarnJobManager(
     defaultExecutionRetries,
     delayBetweenRetries,
     timeout,
-    mode,
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory)

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index cda1cab..993d24e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -145,7 +145,6 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
        private org.apache.flink.configuration.Configuration flinkConfiguration;
 
        private boolean detached;
-       private boolean streamingMode;
 
        private String customName = null;
 
@@ -601,7 +600,6 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, 
UserGroupInformation.getCurrentUser().getShortUserName());
                appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, 
String.valueOf(slots));
                appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, 
String.valueOf(detached));
-               appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, 
String.valueOf(streamingMode));
 
                if(dynamicPropertiesEncoded != null) {
                        
appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded);
@@ -777,11 +775,6 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
        }
 
        @Override
-       public void setStreamingMode(boolean streamingMode) {
-               this.streamingMode = streamingMode;
-       }
-
-       @Override
        public void setName(String name) {
                if(name == null) {
                        throw new IllegalArgumentException("The passed name is 
null");

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index d05b658..46db430 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -25,9 +25,7 @@ import java.util.Map;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.yarn.YarnTaskManager;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -52,11 +50,8 @@ public class YarnTaskManagerRunner {
 
                // try to parse the command line arguments
                final Configuration configuration;
-               final StreamingMode mode;
                try {
-                       scala.Tuple2<Configuration, StreamingMode> res = 
TaskManager.parseArgsAndLoadConfig(args);
-                       configuration = res._1();
-                       mode = res._2();
+                       configuration = 
TaskManager.parseArgsAndLoadConfig(args);
                }
                catch (Throwable t) {
                        LOG.error(t.getMessage(), t);
@@ -94,8 +89,7 @@ public class YarnTaskManagerRunner {
                        @Override
                        public Object run() {
                                try {
-                                       
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration,
-                                               mode, taskManager);
+                                       
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, taskManager);
                                }
                                catch (Throwable t) {
                                        LOG.error("Error while starting the 
TaskManager", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index 6bb3852..dcddad8 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -24,7 +24,6 @@ import java.security.PrivilegedAction
 import akka.actor.ActorSystem
 import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, 
ConfigConstants}
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, 
JobManager}
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -93,14 +92,6 @@ abstract class ApplicationMasterBase {
       val currDir = env.get(Environment.PWD.key())
       require(currDir != null, "Current directory unknown.")
 
-      val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) {
-        log.info("Starting ApplicationMaster/JobManager in streaming mode")
-        StreamingMode.STREAMING
-      } else {
-        log.info("Starting ApplicationMaster/JobManager in batch only mode")
-        StreamingMode.BATCH_ONLY
-      }
-
       // Note that we use the "ownHostname" given by YARN here, to make sure
       // we use the hostnames given by YARN consistently throughout akka.
       // for akka "localhost" and "localhost.localdomain" are different actors.
@@ -124,7 +115,6 @@ abstract class ApplicationMasterBase {
         JobManager.startActorSystemAndJobManagerActors(
           config,
           JobManagerMode.CLUSTER,
-          streamingMode,
           ownHostname,
           0,
           getJobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 4ba80ea..8494f08 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -33,11 +33,9 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus,
-JobNotFound}
+import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus, JobNotFound}
 import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
@@ -75,7 +73,6 @@ import scala.util.Try
   * @param defaultExecutionRetries Number of default execution retries
   * @param delayBetweenRetries Delay between retries
   * @param timeout Timeout for futures
-  * @param mode StreamingMode in which the system shall be started
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
 class YarnJobManager(
@@ -88,7 +85,6 @@ class YarnJobManager(
     defaultExecutionRetries: Int,
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
-    mode: StreamingMode,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
@@ -102,7 +98,6 @@ class YarnJobManager(
     defaultExecutionRetries,
     delayBetweenRetries,
     timeout,
-    mode,
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory) {
@@ -603,8 +598,7 @@ class YarnJobManager(
           hasLog4j,
           yarnClientUsername,
           conf,
-          taskManagerLocalResources,
-          ApplicationMasterBase.hasStreamingMode(env))
+          taskManagerLocalResources)
       )
 
       context.system.scheduler.scheduleOnce(
@@ -665,13 +659,11 @@ class YarnJobManager(
       hasLog4j: Boolean,
       yarnClientUsername: String,
       yarnConf: Configuration,
-      taskManagerLocalResources: Map[String, LocalResource],
-      streamingMode: Boolean)
-    : ContainerLaunchContext = {
+      taskManagerLocalResources: Map[String, LocalResource]) : 
ContainerLaunchContext = {
     log.info("Create container launch context.")
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
 
-    val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode)
+    val heapLimit = calculateMemoryLimits(memoryLimit)
 
     val javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
     val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m 
" +
@@ -694,13 +686,6 @@ class YarnJobManager(
       s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.out 2> " +
       s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.err"
 
-    tmCommand ++= " --streamingMode"
-    if(streamingMode) {
-      tmCommand ++= " streaming"
-    } else {
-      tmCommand ++= " batch"
-    }
-
     ctx.setCommands(Collections.singletonList(tmCommand.toString()))
 
     log.info(s"Starting TM with command=${tmCommand.toString()}")
@@ -736,15 +721,22 @@ class YarnJobManager(
   /**
    * Calculate the correct JVM heap memory limit.
    * @param memoryLimit The maximum memory in megabytes.
-   * @param streamingMode True if this is a streaming cluster.
    * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
    */
-  private def calculateMemoryLimits(memoryLimit: Long, streamingMode: 
Boolean): Long = {
+  private def calculateMemoryLimits(memoryLimit: Long): Long = {
+    val eagerAllocation = flinkConfiguration.getBoolean(
+      ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+    if (eagerAllocation) {
+      log.info("Heap limits calculated with eager memory allocation.")
+    } else {
+      log.info("Heap limits calculated with lazy memory allocation.")
+    }
 
     val useOffHeap = flinkConfiguration.getBoolean(
       ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
 
-    if (useOffHeap && !streamingMode){
+    if (useOffHeap && eagerAllocation){
       val fixedOffHeapSize = flinkConfiguration.getLong(
         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
 

Reply via email to