[hotfix] [tests] Rename UtilsTest to YarnFlinkResourceManagerTest. Test was misnamed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8425e5b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8425e5b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8425e5b Branch: refs/heads/release-1.5 Commit: a8425e5ba6d36f6a8408e47da7f71cf7a0c6eb73 Parents: 4409ba2 Author: gyao <[email protected]> Authored: Thu Apr 19 11:18:32 2018 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Apr 30 23:25:15 2018 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/yarn/UtilsTest.java | 244 ------------------- .../yarn/YarnFlinkResourceManagerTest.java | 244 +++++++++++++++++++ 2 files changed, 244 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java deleted file mode 100644 index b7a38b0..0000000 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; -import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; -import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; -import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.NMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for {@link Utils}. - */ -public class UtilsTest extends TestLogger { - - private static ActorSystem system; - - @BeforeClass - public static void setup() { - system = AkkaUtils.createLocalActorSystem(new Configuration()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - - @Test - public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception { - new JavaTestKit(system) {{ - - final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); - - Configuration flinkConfig = new Configuration(); - YarnConfiguration yarnConfig = new YarnConfiguration(); - SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService( - null, - null); - String applicationMasterHostName = "localhost"; - String webInterfaceURL = "foobar"; - ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters( - 1L, 1L, 1L, 1, new HashMap<String, String>()); - ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class); - int yarnHeartbeatIntervalMillis = 1000; - int maxFailedContainers = 10; - int numInitialTaskManagers = 5; - final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler(); - AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class); - NMClient nodeManagerClient = mock(NMClient.class); - UUID leaderSessionID = UUID.randomUUID(); - - final List<Container> containerList = new ArrayList<>(); - - for (int i = 0; i < numInitialTaskManagers; i++) { - Container mockContainer = mock(Container.class); - when(mockContainer.getId()).thenReturn( - ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - i)); - when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - containerList.add(mockContainer); - } - - doAnswer(new Answer() { - int counter = 0; - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - if (counter < containerList.size()) { - callbackHandler.onContainersAllocated( - Collections.singletonList( - containerList.get(counter++) - )); - } - return null; - } - }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); - - final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>(); - final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>(); - - doAnswer( - (InvocationOnMock invocation) -> { - Container container = (Container) invocation.getArguments()[0]; - resourceManagerFuture.thenCombine(leaderGatewayFuture, - (resourceManagerGateway, leaderGateway) -> { - resourceManagerGateway.tell( - new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), - leaderGateway); - return null; - }); - return null; - }) - .when(nodeManagerClient) - .startContainer( - Matchers.any(Container.class), - Matchers.any(ContainerLaunchContext.class)); - - ActorRef resourceManager = null; - ActorRef leader1; - - try { - leader1 = system.actorOf( - Props.create( - TestingUtils.ForwardingActor.class, - getRef(), - Option.apply(leaderSessionID) - )); - - resourceManager = system.actorOf( - Props.create( - TestingYarnFlinkResourceManager.class, - flinkConfig, - yarnConfig, - leaderRetrievalService, - applicationMasterHostName, - webInterfaceURL, - taskManagerParameters, - taskManagerLaunchContext, - yarnHeartbeatIntervalMillis, - maxFailedContainers, - numInitialTaskManagers, - callbackHandler, - resourceManagerClient, - nodeManagerClient - )); - - leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); - - final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID); - final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID); - - leaderGatewayFuture.complete(leader1Gateway); - resourceManagerFuture.complete(resourceManagerGateway); - - expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); - - resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList())); - - for (int i = 0; i < containerList.size(); i++) { - expectMsgClass(deadline.timeLeft(), Acknowledge.class); - } - - Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft()); - - Await.ready(taskManagerRegisteredFuture, deadline.timeLeft()); - - leaderRetrievalService.notifyListener(null, null); - - leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); - - expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); - - resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList())); - - for (Container container: containerList) { - resourceManagerGateway.tell( - new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), - leader1Gateway); - } - - for (int i = 0; i < containerList.size(); i++) { - expectMsgClass(deadline.timeLeft(), Acknowledge.class); - } - - Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft()); - - int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); - - assertEquals(numInitialTaskManagers, numberOfRegisteredResources); - } finally { - if (resourceManager != null) { - resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - } - }}; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java new file mode 100644 index 0000000..10b2ce9 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; +import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link YarnFlinkResourceManager}. + */ +public class YarnFlinkResourceManagerTest extends TestLogger { + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + @Test + public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception { + new JavaTestKit(system) {{ + + final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); + + Configuration flinkConfig = new Configuration(); + YarnConfiguration yarnConfig = new YarnConfiguration(); + SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService( + null, + null); + String applicationMasterHostName = "localhost"; + String webInterfaceURL = "foobar"; + ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters( + 1L, 1L, 1L, 1, new HashMap<String, String>()); + ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class); + int yarnHeartbeatIntervalMillis = 1000; + int maxFailedContainers = 10; + int numInitialTaskManagers = 5; + final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler(); + AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class); + NMClient nodeManagerClient = mock(NMClient.class); + UUID leaderSessionID = UUID.randomUUID(); + + final List<Container> containerList = new ArrayList<>(); + + for (int i = 0; i < numInitialTaskManagers; i++) { + Container mockContainer = mock(Container.class); + when(mockContainer.getId()).thenReturn( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + i)); + when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + containerList.add(mockContainer); + } + + doAnswer(new Answer() { + int counter = 0; + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (counter < containerList.size()) { + callbackHandler.onContainersAllocated( + Collections.singletonList( + containerList.get(counter++) + )); + } + return null; + } + }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); + + final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>(); + final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>(); + + doAnswer( + (InvocationOnMock invocation) -> { + Container container = (Container) invocation.getArguments()[0]; + resourceManagerFuture.thenCombine(leaderGatewayFuture, + (resourceManagerGateway, leaderGateway) -> { + resourceManagerGateway.tell( + new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), + leaderGateway); + return null; + }); + return null; + }) + .when(nodeManagerClient) + .startContainer( + Matchers.any(Container.class), + Matchers.any(ContainerLaunchContext.class)); + + ActorRef resourceManager = null; + ActorRef leader1; + + try { + leader1 = system.actorOf( + Props.create( + TestingUtils.ForwardingActor.class, + getRef(), + Option.apply(leaderSessionID) + )); + + resourceManager = system.actorOf( + Props.create( + TestingYarnFlinkResourceManager.class, + flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers, + callbackHandler, + resourceManagerClient, + nodeManagerClient + )); + + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); + + final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID); + final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID); + + leaderGatewayFuture.complete(leader1Gateway); + resourceManagerFuture.complete(resourceManagerGateway); + + expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); + + resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList())); + + for (int i = 0; i < containerList.size(); i++) { + expectMsgClass(deadline.timeLeft(), Acknowledge.class); + } + + Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft()); + + Await.ready(taskManagerRegisteredFuture, deadline.timeLeft()); + + leaderRetrievalService.notifyListener(null, null); + + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); + + expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); + + resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList())); + + for (Container container: containerList) { + resourceManagerGateway.tell( + new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), + leader1Gateway); + } + + for (int i = 0; i < containerList.size(); i++) { + expectMsgClass(deadline.timeLeft(), Acknowledge.class); + } + + Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft()); + + int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + + assertEquals(numInitialTaskManagers, numberOfRegisteredResources); + } finally { + if (resourceManager != null) { + resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + }}; + } +}
