http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java deleted file mode 100644 index 2ee280f..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java +++ /dev/null @@ -1,540 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; -import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus; -import org.junit.Before; -import org.junit.Test; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -public class SlotManagerTest { - - private static final double DEFAULT_TESTING_CPU_CORES = 1.0; - - private static final long DEFAULT_TESTING_MEMORY = 512; - - private static final ResourceProfile DEFAULT_TESTING_PROFILE = - new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY); - - private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = - new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); - - private ResourceManagerGateway resourceManagerGateway; - - @Before - public void setUp() { - resourceManagerGateway = mock(ResourceManagerGateway.class); - } - - /** - * Tests that there are no free slots when we request, need to allocate from cluster manager master - */ - @Test - public void testRequestSlotWithoutFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0)); - } - - /** - * Tests that there are some free slots when we request, and the request is fulfilled immediately - */ - @Test - public void testRequestSlotWithFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); - assertEquals(1, slotManager.getFreeSlotCount()); - - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertEquals(0, slotManager.getAllocatedContainers().size()); - } - - /** - * Tests that there are some free slots when we request, but none of them are suitable - */ - @Test - public void testRequestSlotWithoutSuitableSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2); - assertEquals(2, slotManager.getFreeSlotCount()); - - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(2, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); - } - - /** - * Tests that we send duplicated slot request - */ - @Test - public void testDuplicatedSlotRequest() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); - - SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE); - - slotManager.requestSlot(request1); - slotManager.requestSlot(request2); - slotManager.requestSlot(request2); - slotManager.requestSlot(request1); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); - } - - /** - * Tests that we send multiple slot requests - */ - @Test - public void testRequestMultipleSlots() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5); - - // request 3 normal slots - for (int i = 0; i < 3; ++i) { - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - } - - // request 2 big slots - for (int i = 0; i < 2; ++i) { - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); - } - - // request 1 normal slot again - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - - assertEquals(4, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(2, slotManager.getPendingRequestCount()); - assertEquals(2, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1)); - } - - /** - * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request - */ - @Test - public void testNewlyAppearedFreeSlotFulfillPendingRequest() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - assertEquals(1, slotManager.getPendingRequestCount()); - - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - } - - /** - * Tests that a new slot appeared in SlotReport, but we have no pending request - */ - @Test - public void testNewlyAppearedFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - } - - /** - * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests - */ - @Test - public void testNewlyAppearedFreeSlotNotMatchPendingRequests() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); - assertEquals(1, slotManager.getPendingRequestCount()); - - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertFalse(slotManager.isAllocated(slotId)); - } - - /** - * Tests that a new slot appeared in SlotReport, and it's been reported using by some job - */ - @Test - public void testNewlyAppearedInUseSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID()); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertTrue(slotManager.isAllocated(slotId)); - } - - /** - * Tests that we had a slot in-use, and it's confirmed by SlotReport - */ - @Test - public void testExistingInUseSlotUpdateStatus() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - - // make this slot in use - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertTrue(slotManager.isAllocated(slotId)); - - // slot status is confirmed - SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, - request.getAllocationId(), request.getJobId()); - slotManager.updateSlotStatus(slotStatus2); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertTrue(slotManager.isAllocated(slotId)); - } - - /** - * Tests that we had a slot in-use, but it's empty according to the SlotReport - */ - @Test - public void testExistingInUseSlotAdjustedToEmpty() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request1); - - // make this slot in use - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - // another request pending - SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request2); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - assertTrue(slotManager.isAllocated(request1.getAllocationId())); - - - // but slot is reported empty again, request2 will be fulfilled, request1 will be missing - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - assertTrue(slotManager.isAllocated(request2.getAllocationId())); - } - - /** - * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation - * information didn't match. - */ - @Test - public void testExistingInUseSlotWithDifferentAllocationInfo() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - - // make this slot in use - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - assertTrue(slotManager.isAllocated(request.getAllocationId())); - - SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID()); - // update slot status with different allocation info - slotManager.updateSlotStatus(slotStatus2); - - // original request is missing and won't be allocated - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - assertFalse(slotManager.isAllocated(request.getAllocationId())); - assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID())); - } - - /** - * Tests that we had a free slot, and it's confirmed by SlotReport - */ - @Test - public void testExistingEmptySlotUpdateStatus() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); - slotManager.addFreeSlot(slot); - - SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - } - - /** - * Tests that we had a free slot, and it's reported in-use by TaskManager - */ - @Test - public void testExistingEmptySlotAdjustedToInUse() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); - slotManager.addFreeSlot(slot); - - SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE, - new AllocationID(), new JobID()); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slot.getSlotId())); - } - - /** - * Tests that we did some allocation but failed / rejected by TaskManager, request will retry - */ - @Test - public void testSlotAllocationFailedAtTaskManager() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); - slotManager.addFreeSlot(slot); - - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slot.getSlotId())); - - slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - } - - - /** - * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request - */ - @Test - public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); - slotManager.addFreeSlot(slot); - - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - - // slot is set empty by heartbeat - SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile()); - slotManager.updateSlotStatus(slotStatus); - - // another request took this slot - SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request2); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertFalse(slotManager.isAllocated(request.getAllocationId())); - assertTrue(slotManager.isAllocated(request2.getAllocationId())); - - // original request should be pended - slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertFalse(slotManager.isAllocated(request.getAllocationId())); - assertTrue(slotManager.isAllocated(request2.getAllocationId())); - } - - @Test - public void testNotifyTaskManagerFailure() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - - ResourceID resource1 = ResourceID.generate(); - ResourceID resource2 = ResourceID.generate(); - - ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE); - ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE); - ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE); - ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE); - - slotManager.addFreeSlot(slot11); - slotManager.addFreeSlot(slot21); - - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - - assertEquals(2, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - - slotManager.addFreeSlot(slot12); - slotManager.addFreeSlot(slot22); - - assertEquals(2, slotManager.getAllocatedSlotCount()); - assertEquals(2, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - - slotManager.notifyTaskManagerFailure(resource2); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - - // notify an not exist resource failure - slotManager.notifyTaskManagerFailure(ResourceID.generate()); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - } - - // ------------------------------------------------------------------------ - // testing utilities - // ------------------------------------------------------------------------ - - private void directlyProvideFreeSlots( - final SlotManager slotManager, - final ResourceProfile resourceProfile, - final int freeSlotNum) - { - for (int i = 0; i < freeSlotNum; ++i) { - slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile))); - } - } - - // ------------------------------------------------------------------------ - // testing classes - // ------------------------------------------------------------------------ - - private static class TestingSlotManager extends SlotManager { - - private final List<ResourceProfile> allocatedContainers; - - TestingSlotManager(ResourceManagerGateway resourceManagerGateway) { - super(resourceManagerGateway); - this.allocatedContainers = new LinkedList<>(); - } - - /** - * Choose slot randomly if it matches requirement - * - * @param request The slot request - * @param freeSlots All slots which can be used - * @return The chosen slot or null if cannot find a match - */ - @Override - protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) { - for (ResourceSlot slot : freeSlots.values()) { - if (slot.isMatchingRequirement(request.getResourceProfile())) { - return slot; - } - } - return null; - } - - /** - * Choose request randomly if offered slot can match its requirement - * - * @param offeredSlot The free slot - * @param pendingRequests All the pending slot requests - * @return The chosen request's AllocationID or null if cannot find a match - */ - @Override - protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, - Map<AllocationID, SlotRequest> pendingRequests) - { - for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) { - if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) { - return pendingRequest.getValue(); - } - } - return null; - } - - @Override - protected void allocateContainer(ResourceProfile resourceProfile) { - allocatedContainers.add(resourceProfile); - } - - List<ResourceProfile> getAllocatedContainers() { - return allocatedContainers; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java new file mode 100644 index 0000000..80fa19c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.registration; + +import akka.dispatch.Futures; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import org.slf4j.LoggerFactory; + +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future; +import scala.concurrent.Promise; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior. + */ +public class RetryingRegistrationTest extends TestLogger { + + @Test + public void testSimpleSuccessfulRegistration() throws Exception { + final String testId = "laissez les bon temps roulez"; + final String testEndpointAddress = "<test-address>"; + final UUID leaderId = UUID.randomUUID(); + + // an endpoint that immediately returns success + TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); + TestingRpcService rpc = new TestingRpcService(); + + try { + rpc.registerGateway(testEndpointAddress, testGateway); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + registration.startRegistration(); + + Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + assertNotNull(future); + + // multiple accesses return the same future + assertEquals(future, registration.getFuture()); + + Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = + Await.result(future, new FiniteDuration(10, SECONDS)); + + // validate correct invocation and result + assertEquals(testId, success.f1.getCorrelationId()); + assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); + } + finally { + testGateway.stop(); + rpc.stopService(); + } + } + + @Test + public void testPropagateFailures() throws Exception { + final String testExceptionMessage = "testExceptionMessage"; + + // RPC service that fails with exception upon the connection + RpcService rpc = mock(RpcService.class); + when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage)); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID()); + registration.startRegistration(); + + Future<?> future = registration.getFuture(); + assertTrue(future.failed().isCompleted()); + + assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage()); + } + + @Test + public void testRetryConnectOnFailure() throws Exception { + final String testId = "laissez les bon temps roulez"; + final UUID leaderId = UUID.randomUUID(); + + ExecutorService executor = Executors.newCachedThreadPool(); + TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); + + try { + // RPC service that fails upon the first connection, but succeeds on the second + RpcService rpc = mock(RpcService.class); + when(rpc.connect(anyString(), any(Class.class))).thenReturn( + Futures.failed(new Exception("test connect failure")), // first connection attempt fails + Futures.successful(testGateway) // second connection attempt succeeds + ); + when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor)); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId); + registration.startRegistration(); + + Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = + Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS)); + + // validate correct invocation and result + assertEquals(testId, success.f1.getCorrelationId()); + assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); + } + finally { + testGateway.stop(); + executor.shutdown(); + } + } + + @Test + public void testRetriesOnTimeouts() throws Exception { + final String testId = "rien ne va plus"; + final String testEndpointAddress = "<test-address>"; + final UUID leaderId = UUID.randomUUID(); + + // an endpoint that immediately returns futures with timeouts before returning a successful future + TestRegistrationGateway testGateway = new TestRegistrationGateway( + null, // timeout + null, // timeout + new TestRegistrationSuccess(testId) // success + ); + + TestingRpcService rpc = new TestingRpcService(); + + try { + rpc.registerGateway(testEndpointAddress, testGateway); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + + long started = System.nanoTime(); + registration.startRegistration(); + + Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = + Await.result(future, new FiniteDuration(10, SECONDS)); + + long finished = System.nanoTime(); + long elapsedMillis = (finished - started) / 1000000; + + // validate correct invocation and result + assertEquals(testId, success.f1.getCorrelationId()); + assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); + + // validate that some retry-delay / back-off behavior happened + assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT); + } + finally { + rpc.stopService(); + testGateway.stop(); + } + } + + @Test + public void testDecline() throws Exception { + final String testId = "qui a coupe le fromage"; + final String testEndpointAddress = "<test-address>"; + final UUID leaderId = UUID.randomUUID(); + + TestingRpcService rpc = new TestingRpcService(); + + TestRegistrationGateway testGateway = new TestRegistrationGateway( + null, // timeout + new RegistrationResponse.Decline("no reason "), + null, // timeout + new TestRegistrationSuccess(testId) // success + ); + + try { + rpc.registerGateway(testEndpointAddress, testGateway); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + + long started = System.nanoTime(); + registration.startRegistration(); + + Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = + Await.result(future, new FiniteDuration(10, SECONDS)); + + long finished = System.nanoTime(); + long elapsedMillis = (finished - started) / 1000000; + + // validate correct invocation and result + assertEquals(testId, success.f1.getCorrelationId()); + assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); + + // validate that some retry-delay / back-off behavior happened + assertTrue("retries did not properly back off", elapsedMillis >= + 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE); + } + finally { + testGateway.stop(); + rpc.stopService(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testRetryOnError() throws Exception { + final String testId = "Petit a petit, l'oiseau fait son nid"; + final String testEndpointAddress = "<test-address>"; + final UUID leaderId = UUID.randomUUID(); + + TestingRpcService rpc = new TestingRpcService(); + + try { + // gateway that upon calls first responds with a failure, then with a success + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( + Futures.<RegistrationResponse>failed(new Exception("test exception")), + Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId))); + + rpc.registerGateway(testEndpointAddress, testGateway); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + + long started = System.nanoTime(); + registration.startRegistration(); + + Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = + Await.result(future, new FiniteDuration(10, SECONDS)); + + long finished = System.nanoTime(); + long elapsedMillis = (finished - started) / 1000000; + + assertEquals(testId, success.f1.getCorrelationId()); + + // validate that some retry-delay / back-off behavior happened + assertTrue("retries did not properly back off", + elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR); + } + finally { + rpc.stopService(); + } + } + + @Test + public void testCancellation() throws Exception { + final String testEndpointAddress = "my-test-address"; + final UUID leaderId = UUID.randomUUID(); + + TestingRpcService rpc = new TestingRpcService(); + + try { + Promise<RegistrationResponse> result = Futures.promise(); + + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future()); + + rpc.registerGateway(testEndpointAddress, testGateway); + + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + registration.startRegistration(); + + // cancel and fail the current registration attempt + registration.cancel(); + result.failure(new TimeoutException()); + + // there should not be a second registration attempt + verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); + } + finally { + rpc.stopService(); + } + } + + // ------------------------------------------------------------------------ + // test registration + // ------------------------------------------------------------------------ + + private static class TestRegistrationSuccess extends RegistrationResponse.Success { + private static final long serialVersionUID = 5542698790917150604L; + + private final String correlationId; + + private TestRegistrationSuccess(String correlationId) { + this.correlationId = correlationId; + } + + public String getCorrelationId() { + return correlationId; + } + } + + private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> { + + // we use shorter timeouts here to speed up the tests + static final long INITIAL_TIMEOUT = 20; + static final long MAX_TIMEOUT = 200; + static final long DELAY_ON_ERROR = 200; + static final long DELAY_ON_DECLINE = 200; + + public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) { + super(LoggerFactory.getLogger(RetryingRegistrationTest.class), + rpc, "TestEndpoint", + TestRegistrationGateway.class, + targetAddress, leaderId, + INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE); + } + + @Override + protected Future<RegistrationResponse> invokeRegistration( + TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) { + return gateway.registrationCall(leaderId, timeoutMillis); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java new file mode 100644 index 0000000..431fbe8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.registration; + +import akka.dispatch.Futures; + +import org.apache.flink.runtime.rpc.TestingGatewayBase; +import org.apache.flink.util.Preconditions; + +import scala.concurrent.Future; + +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestRegistrationGateway extends TestingGatewayBase { + + private final BlockingQueue<RegistrationCall> invocations; + + private final RegistrationResponse[] responses; + + private int pos; + + public TestRegistrationGateway(RegistrationResponse... responses) { + Preconditions.checkArgument(responses != null && responses.length > 0); + + this.invocations = new LinkedBlockingQueue<>(); + this.responses = responses; + + } + + // ------------------------------------------------------------------------ + + public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) { + invocations.add(new RegistrationCall(leaderId, timeout)); + + RegistrationResponse response = responses[pos]; + if (pos < responses.length - 1) { + pos++; + } + + // return a completed future (for a proper value), or one that never completes and will time out (for null) + return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout); + } + + public BlockingQueue<RegistrationCall> getInvocations() { + return invocations; + } + + // ------------------------------------------------------------------------ + + public static class RegistrationCall { + private final UUID leaderId; + private final long timeout; + + public RegistrationCall(UUID leaderId, long timeout) { + this.leaderId = leaderId; + this.timeout = timeout; + } + + public UUID leaderId() { + return leaderId; + } + + public long timeout() { + return timeout; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java deleted file mode 100644 index 32c6cac..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.messages.StopCluster; -import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.Messages; -import org.apache.flink.runtime.testingUtils.TestingMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.TestingResourceManager; -import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import scala.Option; - - -/** - * Runs tests to ensure that a cluster is shutdown properly. - */ -public class ClusterShutdownITCase extends TestLogger { - - private static ActorSystem system; - - private static Configuration config = new Configuration(); - - @BeforeClass - public static void setup() { - system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - - /** - * Tests a faked cluster shutdown procedure without the ResourceManager. - */ - @Test - public void testClusterShutdownWithoutResourceManager() { - - new JavaTestKit(system){{ - new Within(duration("30 seconds")) { - @Override - protected void run() { - - ActorGateway me = - TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - - // start job manager which doesn't shutdown the actor system - ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "jobmanager1"); - - // Tell the JobManager to inform us of shutdown actions - jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); - - // Register a TaskManager - ActorGateway taskManager = - TestingUtils.createTaskManager(system, jobManager, config, true, true); - - // Tell the TaskManager to inform us of TaskManager shutdowns - taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); - - - // No resource manager connected - jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me); - - expectMsgAllOf( - new TestingMessages.ComponentShutdown(taskManager.actor()), - new TestingMessages.ComponentShutdown(jobManager.actor()), - StopClusterSuccessful.getInstance() - ); - - }}; - }}; - } - - /** - * Tests a faked cluster shutdown procedure with the ResourceManager. - */ - @Test - public void testClusterShutdownWithResourceManager() { - - new JavaTestKit(system){{ - new Within(duration("30 seconds")) { - @Override - protected void run() { - - ActorGateway me = - TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - - // start job manager which doesn't shutdown the actor system - ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "jobmanager2"); - - // Tell the JobManager to inform us of shutdown actions - jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); - - // Register a TaskManager - ActorGateway taskManager = - TestingUtils.createTaskManager(system, jobManager, config, true, true); - - // Tell the TaskManager to inform us of TaskManager shutdowns - taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); - - // Start resource manager and let it register - ActorGateway resourceManager = - TestingUtils.createResourceManager(system, jobManager.actor(), config); - - // Tell the ResourceManager to inform us of ResourceManager shutdowns - resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); - - // notify about a resource manager registration at the job manager - resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); - - // Wait for resource manager - expectMsgEquals(Messages.getAcknowledge()); - - - // Shutdown cluster with resource manager connected - jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me); - - expectMsgAllOf( - new TestingMessages.ComponentShutdown(taskManager.actor()), - new TestingMessages.ComponentShutdown(jobManager.actor()), - new TestingMessages.ComponentShutdown(resourceManager.actor()), - StopClusterSuccessful.getInstance() - ); - - }}; - }}; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java new file mode 100644 index 0000000..5799e62 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call + TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class); + doCallRealMethod().when(gateway).runAsync(any(Runnable.class)); + + RpcService rpcService = mock(RpcService.class); + when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway); + + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); + + final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + resourceManager.start(); + // before grant leadership, resourceManager's leaderId is null + Assert.assertNull(resourceManager.getLeaderSessionID()); + final UUID leaderId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderId); + // after grant leadership, resourceManager's leaderId has value + Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID()); + // then revoke leadership, resourceManager's leaderId is null again + leaderElectionService.notLeader(); + Assert.assertNull(resourceManager.getLeaderSessionID()); + } + + private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway { + @Override + public void runAsync(Runnable runnable) { + runnable.run(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java deleted file mode 100644 index 0c2ca1a..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.messages.Messages; -import org.apache.flink.runtime.messages.RegistrationMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.TestingResourceManager; -import org.apache.flink.util.TestLogger; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import scala.Option; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * It cases which test the interaction of the resource manager with job manager and task managers. - * Runs all tests in one Actor system. - */ -public class ResourceManagerITCase extends TestLogger { - - private static ActorSystem system; - - private static Configuration config = new Configuration(); - - @BeforeClass - public static void setup() { - system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - - /** - * Tests whether the resource manager connects and reconciles existing task managers. - */ - @Test - public void testResourceManagerReconciliation() { - - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - - ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "ReconciliationTest"); - ActorGateway me = - TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - - // !! no resource manager started !! - - ResourceID resourceID = ResourceID.generate(); - - TaskManagerLocation location = mock(TaskManagerLocation.class); - when(location.getResourceID()).thenReturn(resourceID); - - HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000); - - jobManager.tell( - new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1), - me); - - expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); - - // now start the resource manager - ActorGateway resourceManager = - TestingUtils.createResourceManager(system, jobManager.actor(), config); - - // register at testing job manager to receive a message once a resource manager registers - resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); - - // Wait for resource manager - expectMsgEquals(Messages.getAcknowledge()); - - // check if we registered the task manager resource - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me); - - TestingResourceManager.GetRegisteredResourcesReply reply = - expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(1, reply.resources.size()); - assertTrue(reply.resources.contains(resourceID)); - - }}; - }}; - } - - /** - * Tests whether the resource manager gets informed upon TaskManager registration. - */ - @Test - public void testResourceManagerTaskManagerRegistration() { - - new JavaTestKit(system){{ - new Within(duration("30 seconds")) { - @Override - protected void run() { - - ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "RegTest"); - ActorGateway me = - TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - - // start the resource manager - ActorGateway resourceManager = - TestingUtils.createResourceManager(system, jobManager.actor(), config); - - // notify about a resource manager registration at the job manager - resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); - - // Wait for resource manager - expectMsgEquals(Messages.getAcknowledge()); - - // start task manager and wait for registration - ActorGateway taskManager = - TestingUtils.createTaskManager(system, jobManager.actor(), config, true, true); - - // check if we registered the task manager resource - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me); - - TestingResourceManager.GetRegisteredResourcesReply reply = - expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(1, reply.resources.size()); - - }}; - }}; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/72468d14/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java deleted file mode 100644 index 043c81c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; -import org.apache.flink.runtime.clusterframework.messages.RemoveResource; -import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; -import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.TestingResourceManager; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import scala.Option; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * General tests for the resource manager component. - */ -public class ResourceManagerTest { - - private static ActorSystem system; - - private static ActorGateway fakeJobManager; - private static ActorGateway resourceManager; - - private static Configuration config = new Configuration(); - - @BeforeClass - public static void setup() { - system = AkkaUtils.createLocalActorSystem(config); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - - /** - * Tests the registration and reconciliation of the ResourceManager with the JobManager - */ - @Test - public void testJobManagerRegistrationAndReconciliation() { - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); - - expectMsgClass(RegisterResourceManager.class); - - List<ResourceID> resourceList = new ArrayList<>(); - resourceList.add(ResourceID.generate()); - resourceList.add(ResourceID.generate()); - resourceList.add(ResourceID.generate()); - - resourceManager.tell( - new RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList), - fakeJobManager); - - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - TestingResourceManager.GetRegisteredResourcesReply reply = - expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - for (ResourceID id : resourceList) { - if (!reply.resources.contains(id)) { - fail("Expected to find all resources that were provided during registration."); - } - } - }}; - }}; - } - - /** - * Tests delayed or erroneous registration of the ResourceManager with the JobManager - */ - @Test - public void testDelayedJobManagerRegistration() { - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - - // set a short timeout for lookups - Configuration shortTimeoutConfig = config.clone(); - shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s"); - - fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig); - - // wait for registration message - RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class); - // give wrong response - getLastSender().tell(new JobManagerMessages.LeaderSessionMessage(null, new Object()), - fakeJobManager.actor()); - - // expect another retry and let it time out - expectMsgClass(RegisterResourceManager.class); - - // wait for next try after timeout - expectMsgClass(RegisterResourceManager.class); - - }}; - }}; - } - - @Test - public void testTriggerReconnect() { - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - - // set a long timeout for lookups such that the test fails in case of timeouts - Configuration shortTimeoutConfig = config.clone(); - shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s"); - - fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig); - - // wait for registration message - RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class); - // all went well - resourceManager.tell( - new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), - fakeJobManager); - - // force a reconnect - resourceManager.tell( - new TriggerRegistrationAtJobManager(fakeJobManager.actor()), - fakeJobManager); - - // new registration attempt should come in - expectMsgClass(RegisterResourceManager.class); - - }}; - }}; - } - - /** - * Tests the registration and accounting of resources at the ResourceManager. - */ - @Test - public void testTaskManagerRegistration() { - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - - fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); - - // register with JM - expectMsgClass(RegisterResourceManager.class); - resourceManager.tell( - new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), - fakeJobManager); - - ResourceID resourceID = ResourceID.generate(); - - // Send task manager registration - resourceManager.tell(new NotifyResourceStarted(resourceID), - fakeJobManager); - - expectMsgClass(Acknowledge.class); - - // check for number registration of registered resources - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - TestingResourceManager.GetRegisteredResourcesReply reply = - expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(1, reply.resources.size()); - - // Send task manager registration again - resourceManager.tell(new NotifyResourceStarted(resourceID), - fakeJobManager); - - expectMsgClass(Acknowledge.class); - - // check for number registration of registered resources - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(1, reply.resources.size()); - - // Send invalid null resource id to throw an exception during resource registration - resourceManager.tell(new NotifyResourceStarted(null), - fakeJobManager); - - expectMsgClass(Acknowledge.class); - - // check for number registration of registered resources - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(1, reply.resources.size()); - }}; - }}; - } - - @Test - public void testResourceRemoval() { - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - - fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); - - // register with JM - expectMsgClass(RegisterResourceManager.class); - resourceManager.tell( - new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), - fakeJobManager); - - ResourceID resourceID = ResourceID.generate(); - - // remove unknown resource - resourceManager.tell(new RemoveResource(resourceID), fakeJobManager); - - // Send task manager registration - resourceManager.tell(new NotifyResourceStarted(resourceID), - fakeJobManager); - - expectMsgClass(Acknowledge.class); - - // check for number registration of registered resources - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - TestingResourceManager.GetRegisteredResourcesReply reply = - expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(1, reply.resources.size()); - assertTrue(reply.resources.contains(resourceID)); - - // remove resource - resourceManager.tell(new RemoveResource(resourceID), fakeJobManager); - - // check for number registration of registered resources - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(0, reply.resources.size()); - - }}; - }}; - } - - /** - * Tests notification of JobManager about a failed resource. - */ - @Test - public void testResourceFailureNotification() { - new JavaTestKit(system){{ - new Within(duration("10 seconds")) { - @Override - protected void run() { - - fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); - resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); - - // register with JM - expectMsgClass(RegisterResourceManager.class); - resourceManager.tell( - new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), - fakeJobManager); - - ResourceID resourceID1 = ResourceID.generate(); - ResourceID resourceID2 = ResourceID.generate(); - - // Send task manager registration - resourceManager.tell(new NotifyResourceStarted(resourceID1), - fakeJobManager); - - expectMsgClass(Acknowledge.class); - - // Send task manager registration - resourceManager.tell(new NotifyResourceStarted(resourceID2), - fakeJobManager); - - expectMsgClass(Acknowledge.class); - - // check for number registration of registered resources - resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); - TestingResourceManager.GetRegisteredResourcesReply reply = - expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); - - assertEquals(2, reply.resources.size()); - assertTrue(reply.resources.contains(resourceID1)); - assertTrue(reply.resources.contains(resourceID2)); - - // fail resources - resourceManager.tell(new TestingResourceManager.FailResource(resourceID1), fakeJobManager); - resourceManager.tell(new TestingResourceManager.FailResource(resourceID2), fakeJobManager); - - ResourceRemoved answer = expectMsgClass(ResourceRemoved.class); - ResourceRemoved answer2 = expectMsgClass(ResourceRemoved.class); - - assertEquals(resourceID1, answer.resourceId()); - assertEquals(resourceID2, answer2.resourceId()); - - }}; - }}; - } -}