Repository: flink Updated Branches: refs/heads/master 718a17b28 -> 4f12356eb
http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index c804830..47255fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -27,9 +27,7 @@ import akka.testkit.JavaTestKit; import akka.util.Timeout; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -185,7 +183,6 @@ public class JobManagerLeaderElectionTest extends TestLogger { 1, 1L, AkkaUtils.getDefaultTimeout(), - StreamingMode.BATCH_ONLY, leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index 0b84474..c490a64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -68,7 +67,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, StreamingMode.BATCH_ONLY); + cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false); cluster.start(false); // TaskManagers don't have to register at the JobManager cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java index c83f548..c8cf868 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingCluster; import scala.Option; @@ -39,7 +38,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { private final Configuration userConfiguration; private final boolean useSingleActorSystem; - private final StreamingMode streamingMode; public List<TestingLeaderElectionService> leaderElectionServices; public List<TestingLeaderRetrievalService> leaderRetrievalServices; @@ -49,13 +47,11 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { public LeaderElectionRetrievalTestingCluster( Configuration userConfiguration, boolean singleActorSystem, - boolean synchronousDispatcher, - StreamingMode streamingMode) { - super(userConfiguration, singleActorSystem, synchronousDispatcher, streamingMode); + boolean synchronousDispatcher) { + super(userConfiguration, singleActorSystem, synchronousDispatcher); this.userConfiguration = userConfiguration; this.useSingleActorSystem = singleActorSystem; - this.streamingMode = streamingMode; leaderElectionServices = new ArrayList<TestingLeaderElectionService>(); leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>(); @@ -67,11 +63,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { } @Override - public StreamingMode streamingMode() { - return streamingMode; - } - - @Override public boolean useSingleActorSystem() { return useSingleActorSystem; } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index d30db9f..628d756 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -28,7 +28,6 @@ import akka.testkit.JavaTestKit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -76,7 +75,6 @@ public class TaskManagerComponentsStartupShutdownTest { final ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, - StreamingMode.BATCH_ONLY, JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java index 1c15abf..efe0b32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java @@ -24,7 +24,6 @@ import akka.actor.PoisonPill; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; @@ -91,7 +90,6 @@ public class TaskManagerProcessReapingTest { JobManager.startJobManagerActors( new Configuration(), jmActorSystem, - StreamingMode.BATCH_ONLY, JobManager.class, MemoryArchivist.class); @@ -208,7 +206,7 @@ public class TaskManagerProcessReapingTest { cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256); - TaskManager.runTaskManager("localhost", taskManagerPort, cfg, StreamingMode.BATCH_ONLY); + TaskManager.runTaskManager("localhost", taskManagerPort, cfg); // wait forever Object lock = new Object(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 130dcc5..1f328d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -25,7 +25,6 @@ import akka.actor.Terminated; import akka.testkit.JavaTestKit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.InstanceID; @@ -475,7 +474,6 @@ public class TaskManagerRegistrationTest extends TestLogger { NONE_STRING, // no actor name -> random new Some<LeaderRetrievalService>(new StandaloneLeaderRetrievalService(jobManager.path().toString())), false, // init network stack !!! - StreamingMode.BATCH_ONLY, TaskManager.class); watch(taskManager); @@ -597,7 +595,6 @@ public class TaskManagerRegistrationTest extends TestLogger { actorSystem, NONE_STRING, NONE_STRING, - StreamingMode.BATCH_ONLY, JobManager.class, MemoryArchivist.class)._1(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index 9cc8170..ac8eadf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -24,7 +24,6 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.StreamingMode; import org.junit.Test; import java.io.File; @@ -54,8 +53,11 @@ public class TaskManagerStartupTest { final int port = blocker.getLocalPort(); try { - TaskManager.runTaskManager(localHostName, port, new Configuration(), - StreamingMode.BATCH_ONLY, TaskManager.class); + TaskManager.runTaskManager( + localHostName, + port, + new Configuration(), + TaskManager.class); fail("This should fail with an IOException"); } catch (IOException e) { @@ -103,7 +105,7 @@ public class TaskManagerStartupTest { cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656); try { - TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY); + TaskManager.runTaskManager("localhost", 0, cfg); fail("Should fail synchronously with an exception"); } catch (IOException e) { @@ -136,11 +138,12 @@ public class TaskManagerStartupTest { Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656); + cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true"); // something invalid cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42); try { - TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY); + TaskManager.runTaskManager("localhost", 0, cfg); fail("Should fail synchronously with an exception"); } catch (IllegalConfigurationException e) { @@ -152,7 +155,7 @@ public class TaskManagerStartupTest { ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20; cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize); try { - TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY); + TaskManager.runTaskManager("localhost", 0, cfg); fail("Should fail synchronously with an exception"); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java index 0641493..45be1aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java @@ -22,7 +22,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.JobManagerMode; @@ -190,8 +189,7 @@ public class JobManagerProcess extends TestJvmProcess { private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcessEntryPoint.class); /** - * Runs the JobManager process in {@link JobManagerMode#CLUSTER} and {@link - * StreamingMode#STREAMING} (can handle both batch and streaming jobs). + * Runs the JobManager process in {@link JobManagerMode#CLUSTER}. * * <p><strong>Required argument</strong>: <code>port</code>. Start the process with * <code>--port PORT</code>. @@ -210,8 +208,7 @@ public class JobManagerProcess extends TestJvmProcess { LOG.info("Configuration: {}.", config); // Run the JobManager - JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.STREAMING, - "localhost", port); + JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", port); // Run forever. Forever, ever? Forever, ever! new CountDownLatch(1).await(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java index 86449a8..392ccf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.taskmanager.TaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,10 +94,7 @@ public class TaskManagerProcess extends TestJvmProcess { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class); /** - * Runs the JobManager process in {@link StreamingMode#STREAMING} (can handle both batch - * and streaming jobs). - * - * <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager, + * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager, * for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>. */ public static void main(String[] args) throws Exception { @@ -117,8 +113,7 @@ public class TaskManagerProcess extends TestJvmProcess { LOG.info("Configuration: {}.", config); // Run the TaskManager - TaskManager.selectNetworkInterfaceAndRunTaskManager( - config, StreamingMode.STREAMING, TaskManager.class); + TaskManager.selectNetworkInterfaceAndRunTaskManager(config, TaskManager.class); // Run forever new CountDownLatch(1).await(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index fa3fc8b..b96411e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -24,7 +24,6 @@ import akka.actor._ import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.instance._ import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor @@ -162,7 +161,6 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { _system, None, None, - StreamingMode.BATCH_ONLY, classOf[JobManager], classOf[MemoryArchivist]) new AkkaActorGateway(jm, null) http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 703d7bf..5eee4e5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -24,7 +24,6 @@ import akka.pattern.ask import akka.actor.{ActorRef, Props, ActorSystem} import akka.testkit.CallingThreadDispatcher import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.minicluster.FlinkMiniCluster @@ -45,21 +44,13 @@ import scala.concurrent.{Await, Future} class TestingCluster( userConfiguration: Configuration, singleActorSystem: Boolean, - synchronousDispatcher: Boolean, - streamingMode: StreamingMode) + synchronousDispatcher: Boolean) extends FlinkMiniCluster( userConfiguration, - singleActorSystem, - streamingMode) { + singleActorSystem) { - - def this(userConfiguration: Configuration, - singleActorSystem: Boolean, - synchronousDispatcher: Boolean) - = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY) - - def this(userConfiguration: Configuration, singleActorSystem: Boolean) - = this(userConfiguration, singleActorSystem, false) + def this(userConfiguration: Configuration, singleActorSystem: Boolean) = + this(userConfiguration, singleActorSystem, false) def this(userConfiguration: Configuration) = this(userConfiguration, true, false) @@ -127,7 +118,6 @@ class TestingCluster( executionRetries, delayBetweenRetries, timeout, - streamingMode, leaderElectionService, submittedJobsGraphs, checkpointRecoveryFactory)) @@ -153,7 +143,6 @@ class TestingCluster( Some(tmActorName), Some(createLeaderRetrievalService), numTaskManagers == 1, - streamingMode, classOf[TestingTaskManager]) } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index be72003..7cbff48 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -20,7 +20,6 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager @@ -43,7 +42,6 @@ import scala.language.postfixOps * @param defaultExecutionRetries * @param delayBetweenRetries * @param timeout - * @param mode */ class TestingJobManager( flinkConfiguration: Configuration, @@ -55,7 +53,6 @@ class TestingJobManager( defaultExecutionRetries: Int, delayBetweenRetries: Long, timeout: FiniteDuration, - mode: StreamingMode, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory) @@ -69,7 +66,6 @@ class TestingJobManager( defaultExecutionRetries, delayBetweenRetries, timeout, - mode, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 553b686..02b0cf8 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -29,10 +29,9 @@ import grizzled.slf4j.Logger import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} -import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor, StreamingMode} +import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway} -import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager import org.apache.flink.runtime.taskmanager.TaskManager @@ -223,7 +222,6 @@ object TestingUtils { None, leaderRetrievalService, useLocalCommunication, - StreamingMode.BATCH_ONLY, classOf[TestingTaskManager] ) @@ -274,7 +272,6 @@ object TestingUtils { actorSystem, Some(JobManager.JOB_MANAGER_NAME), Some(JobManager.ARCHIVE_NAME), - StreamingMode.BATCH_ONLY, classOf[JobManager], classOf[MemoryArchivist]) http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index de2f3ec..00e36ab 100644 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -21,7 +21,6 @@ package org.apache.flink.api.scala import java.io._ import java.util.concurrent.TimeUnit -import org.apache.flink.runtime.StreamingMode import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} import org.apache.flink.util.TestLogger import org.junit.{AfterClass, BeforeClass, Test, Assert} @@ -261,7 +260,6 @@ object ScalaShellITCase { val cl = TestBaseUtils.startCluster( 1, parallelism, - StreamingMode.BATCH_ONLY, false, false, false) http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java index 6186a47..eda9d1a 100644 --- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java +++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java @@ -20,7 +20,6 @@ package org.apache.flink.tez.test; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.tez.client.LocalTezEnvironment; import org.junit.Assert; @@ -41,7 +40,7 @@ public abstract class TezProgramTestBase extends AbstractTestBase { } public TezProgramTestBase(Configuration config) { - super (config, StreamingMode.BATCH_ONLY); + super (config); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index e6e179c..3550bd9 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -31,7 +31,6 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; @@ -185,7 +184,7 @@ public abstract class KafkaTestBase extends TestLogger { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s"); - flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING); + flink = new ForkableFlinkMiniCluster(flinkConfig, false); flink.start(); flinkPort = flink.getLeaderRPCPort(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index e76a2c0..6ad7352 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -100,7 +99,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { LOG.info("Running job on local embedded Flink mini cluster"); } - LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING); + LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true); try { exec.start(); return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled()); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java index 4e02f2c..6cd2d41 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.util; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; @@ -69,7 +68,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils { @BeforeClass public static void setup() throws Exception { - cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, false, true); + cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true); TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java index ce3aa86..50ed1cf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.util; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.test.util.AbstractTestBase; import org.junit.Test; @@ -34,7 +33,7 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase { public StreamingProgramTestBase() { - super(new Configuration(), StreamingMode.STREAMING); + super(new Configuration()); setParallelism(DEFAULT_PARALLELISM); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala index 3342e1e..ee415d1 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.runtime.StreamingMode import org.apache.flink.streaming.util.TestStreamEnvironment import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} import org.scalatest.BeforeAndAfterAll @@ -37,7 +36,6 @@ trait ScalaStreamingMultipleProgramsTestBase TestBaseUtils.startCluster( 1, parallelism, - StreamingMode.STREAMING, false, false, true http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 005382a..c2da691 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -21,7 +21,6 @@ package org.apache.flink.test.util; import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import scala.concurrent.duration.FiniteDuration; import java.io.File; @@ -44,9 +43,6 @@ public abstract class AbstractTestBase extends TestBaseUtils { private final FiniteDuration timeout; - /** Mode (batch-only / streaming) in which to start the system */ - private final StreamingMode streamingMode; - protected int taskManagerNumSlots = 1; protected int numTaskManagers = 1; @@ -55,9 +51,8 @@ public abstract class AbstractTestBase extends TestBaseUtils { protected ForkableFlinkMiniCluster executor; - public AbstractTestBase(Configuration config, StreamingMode streamingMode) { + public AbstractTestBase(Configuration config) { this.config = Objects.requireNonNull(config); - this.streamingMode = Objects.requireNonNull(streamingMode); this.tempFiles = new ArrayList<File>(); timeout = AkkaUtils.getTimeout(config); @@ -71,7 +66,6 @@ public abstract class AbstractTestBase extends TestBaseUtils { this.executor = startCluster( numTaskManagers, taskManagerNumSlots, - streamingMode, false, false, true); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java index f2de650..e639c80 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java @@ -22,7 +22,6 @@ import java.util.Comparator; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.junit.Assert; import org.junit.Test; import org.apache.flink.api.java.tuple.Tuple; @@ -51,7 +50,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase { } public JavaProgramTestBase(Configuration config) { - super(config, StreamingMode.BATCH_ONLY); + super(config); setTaskManagerNumSlots(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java index 27710d7..38116e2 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.util; -import org.apache.flink.runtime.StreamingMode; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runners.Parameterized; @@ -104,7 +103,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils { cluster = TestBaseUtils.startCluster( 1, DEFAULT_PARALLELISM, - StreamingMode.BATCH_ONLY, startWebServer, false, true); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 608000d..2963418 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; @@ -104,7 +103,6 @@ public class TestBaseUtils extends TestLogger { public static ForkableFlinkMiniCluster startCluster( int numTaskManagers, int taskManagerNumSlots, - StreamingMode mode, boolean startWebserver, boolean startZooKeeper, boolean singleActorSystem) throws Exception { @@ -121,12 +119,11 @@ public class TestBaseUtils extends TestLogger { config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); } - return startCluster(config, mode, singleActorSystem); + return startCluster(config, singleActorSystem); } public static ForkableFlinkMiniCluster startCluster( Configuration config, - StreamingMode mode, boolean singleActorSystem) throws Exception { logDir = File.createTempFile("TestBaseUtils-logdir", null); @@ -144,7 +141,7 @@ public class TestBaseUtils extends TestLogger { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081); config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); - ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem, mode); + ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem); cluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala index 612fdc5..715bc55 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala @@ -18,7 +18,6 @@ package org.apache.flink.test.util -import org.apache.flink.runtime.StreamingMode import org.scalatest.{Suite, BeforeAndAfter} /** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests. @@ -57,7 +56,6 @@ trait FlinkTestBase extends BeforeAndAfter { val cl = TestBaseUtils.startCluster( 1, parallelism, - StreamingMode.BATCH_ONLY, false, false, true) http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 11eb174..ed336d7 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -25,7 +25,6 @@ import akka.pattern.Patterns._ import akka.pattern.ask import org.apache.curator.test.TestingCluster import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode} import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster @@ -45,13 +44,9 @@ import scala.concurrent.{Await, Future} * same [[ActorSystem]], otherwise false. */ class ForkableFlinkMiniCluster( - userConfiguration: Configuration, - singleActorSystem: Boolean, - streamingMode: StreamingMode) - extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) { - - def this(userConfiguration: Configuration, singleActorSystem: Boolean) - = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) + userConfiguration: Configuration, + singleActorSystem: Boolean) + extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) { def this(userConfiguration: Configuration) = this(userConfiguration, true) @@ -103,7 +98,6 @@ class ForkableFlinkMiniCluster( actorSystem, Some(jobManagerName), Some(archiveName), - streamingMode, classOf[TestingJobManager], classOf[TestingMemoryArchivist]) @@ -137,7 +131,6 @@ class ForkableFlinkMiniCluster( Some(TaskManager.TASK_MANAGER_NAME + index), Some(createLeaderRetrievalService()), localExecution, - streamingMode, classOf[TestingTaskManager]) } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 84022f0..cfef35f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -24,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; @@ -70,7 +69,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms"); - cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING); + cluster = new ForkableFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 975582b..2284456 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; @@ -71,7 +70,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms"); - cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING); + cluster = new ForkableFlinkMiniCluster(config, false); cluster.start(); } @@ -555,6 +554,22 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public void open(Configuration parameters) throws Exception { // this sink can only work with DOP 1 assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + + // it can happen that a checkpoint happens when the complete success state is + // already set. In that case we restart with the final state and would never + // finish because no more elements arrive. + if (windowCounts.size() == numKeys) { + boolean seenAll = true; + for (Integer windowCount: windowCounts.values()) { + if (windowCount != numWindowsExpected) { + seenAll = false; + break; + } + } + if (seenAll) { + throw new SuccessException(); + } + } } @Override @@ -597,8 +612,8 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { windowCounts.put(value.f0, 1); } - boolean seenAll = true; if (windowCounts.size() == numKeys) { + boolean seenAll = true; for (Integer windowCount: windowCounts.values()) { if (windowCount < numWindowsExpected) { seenAll = false; http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index e297486..ce3c51a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; @@ -36,9 +35,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -85,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms"); - cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING); + cluster = new ForkableFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index 971a173..7bdfc9d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -54,7 +53,7 @@ public class StreamingScalabilityAndLatency { config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); - cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.STREAMING); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); runPartitioningProgram(cluster.getLeaderRPCPort(), PARALLELISM); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java index ba5ff1c..2f6b762 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java @@ -23,7 +23,6 @@ import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; @@ -168,7 +167,7 @@ public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends Tes TaskManager.startTaskManagerComponentsAndActor( config, tmActorSystem[i], "localhost", Option.<String>empty(), Option.<LeaderRetrievalService>empty(), - false, StreamingMode.STREAMING, TaskManager.class); + false, TaskManager.class); } // Test actor system http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index c02fa6c..54505ac 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -26,7 +26,6 @@ import akka.util.Timeout; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; @@ -128,7 +127,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, - StreamingMode.STREAMING, JobManager.class, MemoryArchivist.class)._1(); @@ -379,7 +377,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class); + TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class); // wait forever Object lock = new Object(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java index e590067..51a0765 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java @@ -23,7 +23,6 @@ import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.instance.ActorGateway; @@ -177,7 +176,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { TaskManager.startTaskManagerComponentsAndActor( config, taskManagerSystem, "localhost", Option.<String>empty(), Option.<LeaderRetrievalService>empty(), - false, StreamingMode.STREAMING, TaskManager.class); + false, TaskManager.class); { // Initial submission http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 6dce370..37e4e38 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -30,7 +30,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobmanager.JobManager; @@ -104,7 +103,6 @@ public class ProcessFailureCancelingITCase { ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, - StreamingMode.BATCH_ONLY, JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index b4a5018..7710f06 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.ForkableFlinkMiniCluster; @@ -76,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger { conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); - flink = new ForkableFlinkMiniCluster(conf, false, StreamingMode.BATCH_ONLY); + flink = new ForkableFlinkMiniCluster(conf, false); flink.start(); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort()); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index fa70039..3379325 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -20,7 +20,6 @@ package org.apache.flink.yarn import akka.actor.ActorRef import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager @@ -61,7 +60,6 @@ class TestingYarnJobManager( defaultExecutionRetries: Int, delayBetweenRetries: Long, timeout: FiniteDuration, - mode: StreamingMode, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory) @@ -75,7 +73,6 @@ class TestingYarnJobManager( defaultExecutionRetries, delayBetweenRetries, timeout, - mode, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java index cda1cab..993d24e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java @@ -145,7 +145,6 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { private org.apache.flink.configuration.Configuration flinkConfiguration; private boolean detached; - private boolean streamingMode; private String customName = null; @@ -601,7 +600,6 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName()); appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots)); appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached)); - appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, String.valueOf(streamingMode)); if(dynamicPropertiesEncoded != null) { appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); @@ -777,11 +775,6 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { } @Override - public void setStreamingMode(boolean streamingMode) { - this.streamingMode = streamingMode; - } - - @Override public void setName(String name) { if(name == null) { throw new IllegalArgumentException("The passed name is null"); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index d05b658..46db430 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -25,9 +25,7 @@ import java.util.Map; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.runtime.StreamingMode; import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.yarn.YarnTaskManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -52,11 +50,8 @@ public class YarnTaskManagerRunner { // try to parse the command line arguments final Configuration configuration; - final StreamingMode mode; try { - scala.Tuple2<Configuration, StreamingMode> res = TaskManager.parseArgsAndLoadConfig(args); - configuration = res._1(); - mode = res._2(); + configuration = TaskManager.parseArgsAndLoadConfig(args); } catch (Throwable t) { LOG.error(t.getMessage(), t); @@ -94,8 +89,7 @@ public class YarnTaskManagerRunner { @Override public Object run() { try { - TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, - mode, taskManager); + TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, taskManager); } catch (Throwable t) { LOG.error("Error while starting the TaskManager", t); http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index 6bb3852..dcddad8 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -24,7 +24,6 @@ import java.security.PrivilegedAction import akka.actor.ActorSystem import org.apache.flink.client.CliFrontend import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants} -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, JobManager} import org.apache.flink.runtime.util.EnvironmentInformation @@ -93,14 +92,6 @@ abstract class ApplicationMasterBase { val currDir = env.get(Environment.PWD.key()) require(currDir != null, "Current directory unknown.") - val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) { - log.info("Starting ApplicationMaster/JobManager in streaming mode") - StreamingMode.STREAMING - } else { - log.info("Starting ApplicationMaster/JobManager in batch only mode") - StreamingMode.BATCH_ONLY - } - // Note that we use the "ownHostname" given by YARN here, to make sure // we use the hostnames given by YARN consistently throughout akka. // for akka "localhost" and "localhost.localdomain" are different actors. @@ -124,7 +115,6 @@ abstract class ApplicationMasterBase { JobManager.startActorSystemAndJobManagerActors( config, JobManagerMode.CLUSTER, - streamingMode, ownHostname, 0, getJobManagerClass, http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 4ba80ea..8494f08 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -33,11 +33,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, -JobNotFound} +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} import org.apache.flink.runtime.messages.Messages.Acknowledge import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus -import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} @@ -75,7 +73,6 @@ import scala.util.Try * @param defaultExecutionRetries Number of default execution retries * @param delayBetweenRetries Delay between retries * @param timeout Timeout for futures - * @param mode StreamingMode in which the system shall be started * @param leaderElectionService LeaderElectionService to participate in the leader election */ class YarnJobManager( @@ -88,7 +85,6 @@ class YarnJobManager( defaultExecutionRetries: Int, delayBetweenRetries: Long, timeout: FiniteDuration, - mode: StreamingMode, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory) @@ -102,7 +98,6 @@ class YarnJobManager( defaultExecutionRetries, delayBetweenRetries, timeout, - mode, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) { @@ -603,8 +598,7 @@ class YarnJobManager( hasLog4j, yarnClientUsername, conf, - taskManagerLocalResources, - ApplicationMasterBase.hasStreamingMode(env)) + taskManagerLocalResources) ) context.system.scheduler.scheduleOnce( @@ -665,13 +659,11 @@ class YarnJobManager( hasLog4j: Boolean, yarnClientUsername: String, yarnConf: Configuration, - taskManagerLocalResources: Map[String, LocalResource], - streamingMode: Boolean) - : ContainerLaunchContext = { + taskManagerLocalResources: Map[String, LocalResource]) : ContainerLaunchContext = { log.info("Create container launch context.") val ctx = Records.newRecord(classOf[ContainerLaunchContext]) - val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode) + val heapLimit = calculateMemoryLimits(memoryLimit) val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " + @@ -694,13 +686,6 @@ class YarnJobManager( s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.out 2> " + s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.err" - tmCommand ++= " --streamingMode" - if(streamingMode) { - tmCommand ++= " streaming" - } else { - tmCommand ++= " batch" - } - ctx.setCommands(Collections.singletonList(tmCommand.toString())) log.info(s"Starting TM with command=${tmCommand.toString()}") @@ -736,15 +721,22 @@ class YarnJobManager( /** * Calculate the correct JVM heap memory limit. * @param memoryLimit The maximum memory in megabytes. - * @param streamingMode True if this is a streaming cluster. * @return A Tuple2 containing the heap and the offHeap limit in megabytes. */ - private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): Long = { + private def calculateMemoryLimits(memoryLimit: Long): Long = { + val eagerAllocation = flinkConfiguration.getBoolean( + ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + if (eagerAllocation) { + log.info("Heap limits calculated with eager memory allocation.") + } else { + log.info("Heap limits calculated with lazy memory allocation.") + } val useOffHeap = flinkConfiguration.getBoolean( ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false) - if (useOffHeap && !streamingMode){ + if (useOffHeap && eagerAllocation){ val fixedOffHeapSize = flinkConfiguration.getLong( ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)