Repository: flink Updated Branches: refs/heads/master 034d9a3ab -> 2dfd463e2
http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 236e922..e4d0f65 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -41,12 +41,10 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; @@ -70,7 +68,6 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -168,91 +165,6 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { } /** - * Tests that submissions to non-leaders are handled. - */ - @Test - public void testSubmitJobToNonLeader() throws Exception { - Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( - ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); - - // Configure the cluster - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - - TestingCluster flink = new TestingCluster(config, false, false); - - try { - final Deadline deadline = TestTimeOut.fromNow(); - - // Start the JobManager and TaskManager - flink.start(true); - - JobGraph jobGraph = createBlockingJobGraph(); - - List<ActorRef> bothJobManagers = flink.getJobManagersAsJava(); - - ActorGateway leadingJobManager = flink.getLeaderGateway(deadline.timeLeft()); - - ActorGateway nonLeadingJobManager; - if (bothJobManagers.get(0).equals(leadingJobManager.actor())) { - nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(1), null); - } - else { - nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(0), null); - } - - log.info("Leading job manager: " + leadingJobManager); - log.info("Non-leading job manager: " + nonLeadingJobManager); - - // Submit the job - nonLeadingJobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); - - log.info("Submitted job graph to " + nonLeadingJobManager); - - // Wait for the job to start. We are asking the *leading** JM here although we've - // submitted the job to the non-leading JM. This is the behaviour under test. - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, - leadingJobManager, deadline.timeLeft()); - - log.info("Wait that the non-leader removes the submitted job."); - - // Make sure that the **non-leading** JM has actually removed the job graph from its - // local state. - boolean success = false; - while (!success && deadline.hasTimeLeft()) { - JobStatusResponse jobStatusResponse = JobManagerActorTestUtils.requestJobStatus( - jobGraph.getJobID(), nonLeadingJobManager, deadline.timeLeft()); - - if (jobStatusResponse instanceof JobManagerMessages.JobNotFound) { - success = true; - } - else { - log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString()); - Thread.sleep(100); - } - } - - if (!success) { - fail("Non-leading JM was still holding reference to the job graph."); - } - - Future<Object> jobRemoved = leadingJobManager.ask( - new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), - deadline.timeLeft()); - - leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); - - Await.ready(jobRemoved, deadline.timeLeft()); - } - finally { - flink.shutdown(); - } - - // Verify that everything is clean - verifyCleanRecoveryState(config); - } - - /** * Tests that clients receive updates after recovery by a new leader. */ @Test http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index 440cfff..e38fab4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -24,6 +24,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -75,7 +76,7 @@ public class LocalFlinkMiniClusterITCase { final ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); new JavaTestKit(system) {{ - final ActorGateway selfGateway = new AkkaActorGateway(getRef(), null); + final ActorGateway selfGateway = new AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID); new Within(TestingUtils.TESTING_DURATION()) { http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index 8534ba8..a09c5b2 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -93,7 +93,9 @@ public class UtilsTest extends TestLogger { Configuration flinkConfig = new Configuration(); YarnConfiguration yarnConfig = new YarnConfiguration(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); String applicationMasterHostName = "localhost"; String webInterfaceURL = "foobar"; ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
