This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 87c3933b44e38242e80717758261fd34487d185c Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Feb 22 08:30:24 2021 +0100 [FLINK-21399][coordination][tests] Refactor registerSlotsRequiredForJobExecution --- .../jobmaster/JobMasterQueryableStateTest.java | 41 +----------- .../runtime/jobmaster/JobMasterTestUtils.java | 74 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java index fa53ebb..0de4ec9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.queryablestate.KvStateID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -37,11 +35,6 @@ import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; -import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; -import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; -import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; -import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -53,10 +46,7 @@ import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.Collection; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.hamcrest.CoreMatchers.either; @@ -318,35 +308,10 @@ public class JobMasterQueryableStateTest extends TestLogger { } } - private void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway) + private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway) throws ExecutionException, InterruptedException { - - final TaskExecutorGateway taskExecutorGateway = - new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); - final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = - new LocalUnresolvedTaskManagerLocation(); - - rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); - - jobMasterGateway - .registerTaskManager( - taskExecutorGateway.getAddress(), - unresolvedTaskManagerLocation, - testingTimeout) - .get(); - - Collection<SlotOffer> slotOffers = - IntStream.range(0, PARALLELISM) - .mapToObj( - index -> - new SlotOffer( - new AllocationID(), index, ResourceProfile.ANY)) - .collect(Collectors.toList()); - - jobMasterGateway - .offerSlots( - unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout) - .get(); + JobMasterTestUtils.registerTaskExecutorAndOfferSlots( + rpcService, jobMasterGateway, PARALLELISM, testingTimeout); } private static void registerKvState( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java new file mode 100644 index 0000000..cd9609f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; + +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** JobMaster-related test utils. */ +public class JobMasterTestUtils { + + public static void registerTaskExecutorAndOfferSlots( + TestingRpcService rpcService, + JobMasterGateway jobMasterGateway, + int numSlots, + Time testingTimeout) + throws ExecutionException, InterruptedException { + + final TaskExecutorGateway taskExecutorGateway = + new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = + new LocalUnresolvedTaskManagerLocation(); + + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); + + jobMasterGateway + .registerTaskManager( + taskExecutorGateway.getAddress(), + unresolvedTaskManagerLocation, + testingTimeout) + .get(); + + Collection<SlotOffer> slotOffers = + IntStream.range(0, numSlots) + .mapToObj( + index -> + new SlotOffer( + new AllocationID(), index, ResourceProfile.ANY)) + .collect(Collectors.toList()); + + jobMasterGateway + .offerSlots( + unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout) + .get(); + } + + private JobMasterTestUtils() {} +}