[hotfix] [tests] Rename UtilsTest to YarnFlinkResourceManagerTest.

Test was misnamed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8425e5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8425e5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8425e5b

Branch: refs/heads/release-1.5
Commit: a8425e5ba6d36f6a8408e47da7f71cf7a0c6eb73
Parents: 4409ba2
Author: gyao <[email protected]>
Authored: Thu Apr 19 11:18:32 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Apr 30 23:25:15 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/yarn/UtilsTest.java   | 244 -------------------
 .../yarn/YarnFlinkResourceManagerTest.java      | 244 +++++++++++++++++++
 2 files changed, 244 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index b7a38b0..0000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
-import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link Utils}.
- */
-public class UtilsTest extends TestLogger {
-
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup() {
-               system = AkkaUtils.createLocalActorSystem(new Configuration());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       @Test
-       public void testYarnFlinkResourceManagerJobManagerLostLeadership() 
throws Exception {
-               new JavaTestKit(system) {{
-
-                       final Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
-
-                       Configuration flinkConfig = new Configuration();
-                       YarnConfiguration yarnConfig = new YarnConfiguration();
-                       SettableLeaderRetrievalService leaderRetrievalService = 
new SettableLeaderRetrievalService(
-                               null,
-                               null);
-                       String applicationMasterHostName = "localhost";
-                       String webInterfaceURL = "foobar";
-                       ContaineredTaskManagerParameters taskManagerParameters 
= new ContaineredTaskManagerParameters(
-                               1L, 1L, 1L, 1, new HashMap<String, String>());
-                       ContainerLaunchContext taskManagerLaunchContext = 
mock(ContainerLaunchContext.class);
-                       int yarnHeartbeatIntervalMillis = 1000;
-                       int maxFailedContainers = 10;
-                       int numInitialTaskManagers = 5;
-                       final YarnResourceManagerCallbackHandler 
callbackHandler = new YarnResourceManagerCallbackHandler();
-                       AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = mock(AMRMClientAsync.class);
-                       NMClient nodeManagerClient = mock(NMClient.class);
-                       UUID leaderSessionID = UUID.randomUUID();
-
-                       final List<Container> containerList = new ArrayList<>();
-
-                       for (int i = 0; i < numInitialTaskManagers; i++) {
-                               Container mockContainer = mock(Container.class);
-                               when(mockContainer.getId()).thenReturn(
-                                       ContainerId.newInstance(
-                                               
ApplicationAttemptId.newInstance(
-                                                       
ApplicationId.newInstance(System.currentTimeMillis(), 1),
-                                                       1),
-                                               i));
-                               
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
-                               containerList.add(mockContainer);
-                       }
-
-                       doAnswer(new Answer() {
-                               int counter = 0;
-                               @Override
-                               public Object answer(InvocationOnMock 
invocation) throws Throwable {
-                                       if (counter < containerList.size()) {
-                                               
callbackHandler.onContainersAllocated(
-                                                       
Collections.singletonList(
-                                                               
containerList.get(counter++)
-                                                       ));
-                                       }
-                                       return null;
-                               }
-                       
}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
-                       final CompletableFuture<AkkaActorGateway> 
resourceManagerFuture = new CompletableFuture<>();
-                       final CompletableFuture<AkkaActorGateway> 
leaderGatewayFuture = new CompletableFuture<>();
-
-                       doAnswer(
-                               (InvocationOnMock invocation) -> {
-                                       Container container = (Container) 
invocation.getArguments()[0];
-                                       
resourceManagerFuture.thenCombine(leaderGatewayFuture,
-                                               (resourceManagerGateway, 
leaderGateway) -> {
-                                               resourceManagerGateway.tell(
-                                                       new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-                                                       leaderGateway);
-                                               return null;
-                                               });
-                                       return null;
-                               })
-                               .when(nodeManagerClient)
-                               .startContainer(
-                                       Matchers.any(Container.class),
-                                       
Matchers.any(ContainerLaunchContext.class));
-
-                       ActorRef resourceManager = null;
-                       ActorRef leader1;
-
-                       try {
-                               leader1 = system.actorOf(
-                                       Props.create(
-                                               
TestingUtils.ForwardingActor.class,
-                                               getRef(),
-                                               Option.apply(leaderSessionID)
-                                       ));
-
-                               resourceManager = system.actorOf(
-                                       Props.create(
-                                               
TestingYarnFlinkResourceManager.class,
-                                               flinkConfig,
-                                               yarnConfig,
-                                               leaderRetrievalService,
-                                               applicationMasterHostName,
-                                               webInterfaceURL,
-                                               taskManagerParameters,
-                                               taskManagerLaunchContext,
-                                               yarnHeartbeatIntervalMillis,
-                                               maxFailedContainers,
-                                               numInitialTaskManagers,
-                                               callbackHandler,
-                                               resourceManagerClient,
-                                               nodeManagerClient
-                                       ));
-
-                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
-
-                               final AkkaActorGateway leader1Gateway = new 
AkkaActorGateway(leader1, leaderSessionID);
-                               final AkkaActorGateway resourceManagerGateway = 
new AkkaActorGateway(resourceManager, leaderSessionID);
-
-                               leaderGatewayFuture.complete(leader1Gateway);
-                               
resourceManagerFuture.complete(resourceManagerGateway);
-
-                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
-
-                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
-
-                               for (int i = 0; i < containerList.size(); i++) {
-                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
-                               }
-
-                               Future<Object> taskManagerRegisteredFuture = 
resourceManagerGateway.ask(new 
NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
-
-                               Await.ready(taskManagerRegisteredFuture, 
deadline.timeLeft());
-
-                               leaderRetrievalService.notifyListener(null, 
null);
-
-                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
-
-                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
-
-                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
-
-                               for (Container container: containerList) {
-                                       resourceManagerGateway.tell(
-                                               new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-                                               leader1Gateway);
-                               }
-
-                               for (int i = 0; i < containerList.size(); i++) {
-                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
-                               }
-
-                               Future<Object> 
numberOfRegisteredResourcesFuture = 
resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, 
deadline.timeLeft());
-
-                               int numberOfRegisteredResources = (Integer) 
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
-
-                               assertEquals(numInitialTaskManagers, 
numberOfRegisteredResources);
-                       } finally {
-                               if (resourceManager != null) {
-                                       
resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-                               }
-                       }
-               }};
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
new file mode 100644
index 0000000..10b2ce9
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link YarnFlinkResourceManager}.
+ */
+public class YarnFlinkResourceManagerTest extends TestLogger {
+
+       private static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       @Test
+       public void testYarnFlinkResourceManagerJobManagerLostLeadership() 
throws Exception {
+               new JavaTestKit(system) {{
+
+                       final Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
+
+                       Configuration flinkConfig = new Configuration();
+                       YarnConfiguration yarnConfig = new YarnConfiguration();
+                       SettableLeaderRetrievalService leaderRetrievalService = 
new SettableLeaderRetrievalService(
+                               null,
+                               null);
+                       String applicationMasterHostName = "localhost";
+                       String webInterfaceURL = "foobar";
+                       ContaineredTaskManagerParameters taskManagerParameters 
= new ContaineredTaskManagerParameters(
+                               1L, 1L, 1L, 1, new HashMap<String, String>());
+                       ContainerLaunchContext taskManagerLaunchContext = 
mock(ContainerLaunchContext.class);
+                       int yarnHeartbeatIntervalMillis = 1000;
+                       int maxFailedContainers = 10;
+                       int numInitialTaskManagers = 5;
+                       final YarnResourceManagerCallbackHandler 
callbackHandler = new YarnResourceManagerCallbackHandler();
+                       AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = mock(AMRMClientAsync.class);
+                       NMClient nodeManagerClient = mock(NMClient.class);
+                       UUID leaderSessionID = UUID.randomUUID();
+
+                       final List<Container> containerList = new ArrayList<>();
+
+                       for (int i = 0; i < numInitialTaskManagers; i++) {
+                               Container mockContainer = mock(Container.class);
+                               when(mockContainer.getId()).thenReturn(
+                                       ContainerId.newInstance(
+                                               
ApplicationAttemptId.newInstance(
+                                                       
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                                                       1),
+                                               i));
+                               
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+                               containerList.add(mockContainer);
+                       }
+
+                       doAnswer(new Answer() {
+                               int counter = 0;
+                               @Override
+                               public Object answer(InvocationOnMock 
invocation) throws Throwable {
+                                       if (counter < containerList.size()) {
+                                               
callbackHandler.onContainersAllocated(
+                                                       
Collections.singletonList(
+                                                               
containerList.get(counter++)
+                                                       ));
+                                       }
+                                       return null;
+                               }
+                       
}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+                       final CompletableFuture<AkkaActorGateway> 
resourceManagerFuture = new CompletableFuture<>();
+                       final CompletableFuture<AkkaActorGateway> 
leaderGatewayFuture = new CompletableFuture<>();
+
+                       doAnswer(
+                               (InvocationOnMock invocation) -> {
+                                       Container container = (Container) 
invocation.getArguments()[0];
+                                       
resourceManagerFuture.thenCombine(leaderGatewayFuture,
+                                               (resourceManagerGateway, 
leaderGateway) -> {
+                                               resourceManagerGateway.tell(
+                                                       new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+                                                       leaderGateway);
+                                               return null;
+                                               });
+                                       return null;
+                               })
+                               .when(nodeManagerClient)
+                               .startContainer(
+                                       Matchers.any(Container.class),
+                                       
Matchers.any(ContainerLaunchContext.class));
+
+                       ActorRef resourceManager = null;
+                       ActorRef leader1;
+
+                       try {
+                               leader1 = system.actorOf(
+                                       Props.create(
+                                               
TestingUtils.ForwardingActor.class,
+                                               getRef(),
+                                               Option.apply(leaderSessionID)
+                                       ));
+
+                               resourceManager = system.actorOf(
+                                       Props.create(
+                                               
TestingYarnFlinkResourceManager.class,
+                                               flinkConfig,
+                                               yarnConfig,
+                                               leaderRetrievalService,
+                                               applicationMasterHostName,
+                                               webInterfaceURL,
+                                               taskManagerParameters,
+                                               taskManagerLaunchContext,
+                                               yarnHeartbeatIntervalMillis,
+                                               maxFailedContainers,
+                                               numInitialTaskManagers,
+                                               callbackHandler,
+                                               resourceManagerClient,
+                                               nodeManagerClient
+                                       ));
+
+                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
+
+                               final AkkaActorGateway leader1Gateway = new 
AkkaActorGateway(leader1, leaderSessionID);
+                               final AkkaActorGateway resourceManagerGateway = 
new AkkaActorGateway(resourceManager, leaderSessionID);
+
+                               leaderGatewayFuture.complete(leader1Gateway);
+                               
resourceManagerFuture.complete(resourceManagerGateway);
+
+                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
+
+                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
+
+                               for (int i = 0; i < containerList.size(); i++) {
+                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
+                               }
+
+                               Future<Object> taskManagerRegisteredFuture = 
resourceManagerGateway.ask(new 
NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
+
+                               Await.ready(taskManagerRegisteredFuture, 
deadline.timeLeft());
+
+                               leaderRetrievalService.notifyListener(null, 
null);
+
+                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
+
+                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
+
+                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
+
+                               for (Container container: containerList) {
+                                       resourceManagerGateway.tell(
+                                               new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+                                               leader1Gateway);
+                               }
+
+                               for (int i = 0; i < containerList.size(); i++) {
+                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
+                               }
+
+                               Future<Object> 
numberOfRegisteredResourcesFuture = 
resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, 
deadline.timeLeft());
+
+                               int numberOfRegisteredResources = (Integer) 
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+
+                               assertEquals(numInitialTaskManagers, 
numberOfRegisteredResources);
+                       } finally {
+                               if (resourceManager != null) {
+                                       
resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                               }
+                       }
+               }};
+       }
+}

Reply via email to