This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 931fa493637b YARN-11613. [Federation] Router CLI Supports Delete SubClusterPolicyConfiguration Of Queues. (#6295) Contributed by Shilun Fan. 931fa493637b is described below commit 931fa493637bd6ccebc4734d655c8f3699ea5aa7 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Tue Dec 5 23:32:03 2023 +0800 YARN-11613. [Federation] Router CLI Supports Delete SubClusterPolicyConfiguration Of Queues. (#6295) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri <inigo...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../api/ResourceManagerAdministrationProtocol.java | 15 +++ .../DeleteFederationQueuePoliciesRequest.java | 62 +++++++++ .../DeleteFederationQueuePoliciesResponse.java | 51 +++++++ .../resourcemanager_administration_protocol.proto | 1 + ...arn_server_resourcemanager_service_protos.proto | 8 ++ .../apache/hadoop/yarn/client/cli/RouterCLI.java | 60 ++++++++- .../hadoop/yarn/client/cli/TestRouterCLI.java | 24 +++- ...eManagerAdministrationProtocolPBClientImpl.java | 19 +++ ...ManagerAdministrationProtocolPBServiceImpl.java | 23 ++++ ...DeleteFederationQueuePoliciesRequestPBImpl.java | 147 +++++++++++++++++++++ ...eleteFederationQueuePoliciesResponsePBImpl.java | 96 ++++++++++++++ .../federation/store/FederationPolicyStore.java | 12 ++ .../store/impl/MemoryFederationStateStore.java | 15 +++ .../store/impl/SQLFederationStateStore.java | 30 +++++ .../store/impl/ZookeeperFederationStateStore.java | 35 +++++ ...eteSubClusterPoliciesConfigurationsRequest.java | 61 +++++++++ ...teSubClusterPoliciesConfigurationsResponse.java | 35 +++++ ...ClusterPoliciesConfigurationsRequestPBImpl.java | 146 ++++++++++++++++++++ ...lusterPoliciesConfigurationsResponsePBImpl.java | 75 +++++++++++ .../store/sql/FederationQueryRunner.java | 26 +++- .../utils/FederationPolicyStoreInputValidator.java | 19 +++ .../utils/FederationStateStoreFacade.java | 10 ++ .../main/proto/yarn_server_federation_protos.proto | 7 + .../yarn/server/MockResourceManagerFacade.java | 8 ++ .../store/impl/FederationStateStoreBaseTest.java | 50 +++++++ .../yarn/server/resourcemanager/AdminService.java | 10 ++ .../federation/FederationStateStoreService.java | 12 ++ .../hadoop/yarn/server/router/RouterMetrics.java | 32 +++++ .../rmadmin/DefaultRMAdminRequestInterceptor.java | 8 ++ .../rmadmin/FederationRMAdminInterceptor.java | 42 ++++++ .../router/rmadmin/RouterRMAdminService.java | 9 ++ .../yarn/server/router/TestRouterMetrics.java | 36 +++++ .../PassThroughRMAdminRequestInterceptor.java | 8 ++ .../rmadmin/TestFederationRMAdminInterceptor.java | 43 +++++- 34 files changed, 1230 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 0b4f1ad6429f..edcce9000bae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; @Private public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol { @@ -233,4 +235,17 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr @Idempotent DeleteFederationApplicationResponse deleteFederationApplication( DeleteFederationApplicationRequest request) throws YarnException, IOException; + + /** + * In YARN-Federation mode, this method provides a way to delete queue weight policies. + * + * @param request DeleteFederationQueuePoliciesRequest Request. + * @return Response from DeleteFederationQueuePolicies. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Private + @Idempotent + DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DeleteFederationQueuePoliciesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DeleteFederationQueuePoliciesRequest.java new file mode 100644 index 000000000000..d744501ad3d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DeleteFederationQueuePoliciesRequest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * This class is used for handling queue policy deletion requests, + * which include the queues that need to be removed. + */ +@Private +@Unstable +public abstract class DeleteFederationQueuePoliciesRequest { + + @Private + @Unstable + public static DeleteFederationQueuePoliciesRequest newInstance( + List<String> queues) { + DeleteFederationQueuePoliciesRequest request = + Records.newRecord(DeleteFederationQueuePoliciesRequest.class); + request.setQueues(queues); + return request; + } + + /** + * To obtain the list of queues to be deleted. + * + * @return list of queue names. + */ + @Public + @Unstable + public abstract List<String> getQueues(); + + /** + * Set the list of queues to be deleted. + * + * @param queues list of queue names. + */ + @Private + @Unstable + public abstract void setQueues(List<String> queues); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DeleteFederationQueuePoliciesResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DeleteFederationQueuePoliciesResponse.java new file mode 100644 index 000000000000..b04503f3d88b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DeleteFederationQueuePoliciesResponse.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * This class is utilized for responding to queue deletion requests + * and includes the provision of return information. + */ +@Private +@Unstable +public abstract class DeleteFederationQueuePoliciesResponse { + + public static DeleteFederationQueuePoliciesResponse newInstance() { + return Records.newRecord(DeleteFederationQueuePoliciesResponse.class); + } + + public static DeleteFederationQueuePoliciesResponse newInstance(String msg) { + DeleteFederationQueuePoliciesResponse response = + Records.newRecord(DeleteFederationQueuePoliciesResponse.class); + response.setMessage(msg); + return response; + } + + @Public + @Unstable + public abstract String getMessage(); + + @Public + @Unstable + public abstract void setMessage(String msg); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index f0f3fa563f59..8150b35a9d35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -52,4 +52,5 @@ service ResourceManagerAdministrationProtocolService { rpc batchSaveFederationQueuePolicies(BatchSaveFederationQueuePoliciesRequestProto) returns (BatchSaveFederationQueuePoliciesResponseProto); rpc listFederationQueuePolicies(QueryFederationQueuePoliciesRequestProto) returns (QueryFederationQueuePoliciesResponseProto); rpc deleteFederationApplication(DeleteFederationApplicationRequestProto) returns (DeleteFederationApplicationResponseProto); + rpc deleteFederationPoliciesByQueues(DeleteFederationQueuePoliciesRequestProto) returns (DeleteFederationQueuePoliciesResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index ba18e514a7df..7e22c88c4ab0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -211,6 +211,14 @@ message DeleteFederationApplicationResponseProto { optional string message = 1; } +message DeleteFederationQueuePoliciesRequestProto { + repeated string queues = 1; +} + +message DeleteFederationQueuePoliciesResponseProto { + required string message = 1; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java index 9a68794078e2..8aaefaedf1db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRes import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; @@ -148,6 +150,9 @@ public class RouterCLI extends Configured implements Tool { private static final String OPTION_CURRENT_PAGE = "currentPage"; private static final String OPTION_QUEUE = "queue"; private static final String OPTION_QUEUES = "queues"; + // delete policy + private static final String OPTION_D = "d"; + private static final String OPTION_DELETE = "delete"; private static final String XML_TAG_SUBCLUSTERIDINFO = "subClusterIdInfo"; private static final String XML_TAG_AMRMPOLICYWEIGHTS = "amrmPolicyWeights"; @@ -213,6 +218,20 @@ public class RouterCLI extends Configured implements Tool { protected final static String POLICY_LIST_USAGE_EXAMPLE_2 = "yarn routeradmin -policy -list --pageSize 20 --currentPage 1 --queues root.a,root.b"; + protected final static UsageInfo POLICY_DELETE_USAGE = new UsageInfo( + "-d|--delete [--queue]", + "This command is used to delete the policy of the queue."); + + protected final static String POLICY_DELETE_USAGE_EXAMPLE_DESC = + "We delete the weight information of root.a. \\" + + "We can use --queue to specify the name of the queue."; + + protected final static String POLICY_DELETE_USAGE_EXAMPLE1 = + "yarn routeradmin -policy -d --queue root.a"; + + protected final static String POLICY_DELETE_USAGE_EXAMPLE2 = + "yarn routeradmin -policy --delete --queue root.a"; + protected final static RouterCmdUsageInfos POLICY_USAGEINFOS = new RouterCmdUsageInfos() // Policy Save .addUsageInfo(POLICY_SAVE_USAGE) @@ -228,7 +247,12 @@ public class RouterCLI extends Configured implements Tool { .addUsageInfo(POLICY_LIST_USAGE) .addExampleDescs(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_DESC) .addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_1) - .addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_2); + .addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_2) + // Policy Delete + .addUsageInfo(POLICY_DELETE_USAGE) + .addExampleDescs(POLICY_DELETE_USAGE.args, POLICY_DELETE_USAGE_EXAMPLE_DESC) + .addExample(POLICY_DELETE_USAGE.args, POLICY_DELETE_USAGE_EXAMPLE1) + .addExample(POLICY_DELETE_USAGE.args, POLICY_DELETE_USAGE_EXAMPLE2); // Command3: application private static final String CMD_APPLICATION = "-application"; @@ -501,6 +525,8 @@ public class RouterCLI extends Configured implements Tool { "the queue we need to filter. example: root.a"); Option queuesOpt = new Option(null, "queues", true, "list of queues to filter. example: root.a,root.b,root.c"); + Option deleteOpt = new Option(OPTION_D, OPTION_DELETE, false, ""); + opts.addOption(saveOpt); opts.addOption(batchSaveOpt); opts.addOption(formatOpt); @@ -510,6 +536,7 @@ public class RouterCLI extends Configured implements Tool { opts.addOption(currentPageOpt); opts.addOption(queueOpt); opts.addOption(queuesOpt); + opts.addOption(deleteOpt); // Parse command line arguments. CommandLine cliParser; @@ -580,6 +607,10 @@ public class RouterCLI extends Configured implements Tool { // List Policies. return handListPolicies(pageSize, currentPage, queue, queues); + } else if (cliParser.hasOption(OPTION_D) || cliParser.hasOption(OPTION_DELETE)) { + String queue = cliParser.getOptionValue(OPTION_QUEUE); + // Delete Policy. + return handDeletePolicy(queue); } else { // printUsage printUsage(args[0]); @@ -886,6 +917,33 @@ public class RouterCLI extends Configured implements Tool { return 0; } + /** + * Delete queue weight information. + * + * @param queue Queue whose policy needs to be deleted. + * @return 0, success; 1, failed. + */ + protected int handDeletePolicy(String queue) { + LOG.info("Delete {} Policy.", queue); + try { + if (StringUtils.isBlank(queue)) { + System.err.println("Queue cannot be empty."); + } + List<String> queues = new ArrayList<>(); + queues.add(queue); + DeleteFederationQueuePoliciesRequest request = + DeleteFederationQueuePoliciesRequest.newInstance(queues); + ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); + DeleteFederationQueuePoliciesResponse response = + adminProtocol.deleteFederationPoliciesByQueues(request); + System.out.println(response.getMessage()); + return EXIT_SUCCESS; + } catch (Exception e) { + LOG.error("handDeletePolicy queue = {} error.", queue, e); + return EXIT_ERROR; + } + } + @Override public int run(String[] args) throws Exception { YarnConfiguration yarnConf = getConf() == null ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java index 8e697dd6c01d..31003e0e585b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.junit.Before; import org.junit.Test; import org.mockito.stubbing.Answer; @@ -94,6 +96,15 @@ public class TestRouterCLI { return QueryFederationQueuePoliciesResponse.newInstance(1, 1, 1, 10, weights); }); + when(admin.deleteFederationPoliciesByQueues(any(DeleteFederationQueuePoliciesRequest.class))) + .thenAnswer((Answer<DeleteFederationQueuePoliciesResponse>) invocationOnMock -> { + // Step1. parse request. + Object obj = invocationOnMock.getArgument(0); + DeleteFederationQueuePoliciesRequest request = (DeleteFederationQueuePoliciesRequest) obj; + List<String> queues = request.getQueues(); + return DeleteFederationQueuePoliciesResponse.newInstance("queues = " + + StringUtils.join(queues, ",") + " delete success."); + }); Configuration config = new Configuration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -288,7 +299,7 @@ public class TestRouterCLI { assertNotNull(policyUsageInfos); Map<String, List<String>> policyExamplesMap = policyUsageInfos.getExamples(); assertNotNull(policyExamplesMap); - assertEquals(3, policyExamplesMap.size()); + assertEquals(4, policyExamplesMap.size()); policyExamplesMap.forEach((cmd, cmdExamples) -> { assertEquals(2, cmdExamples.size()); }); @@ -299,4 +310,15 @@ public class TestRouterCLI { assertNotNull(applicationExamplesMap); assertEquals(1, applicationExamplesMap.size()); } + + @Test + public void testDeleteFederationPoliciesByQueues() throws Exception { + PrintStream oldOutPrintStream = System.out; + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(dataOut)); + oldOutPrintStream.println(dataOut); + + String[] args = {"-policy", "-d", "--queue", "root.a"}; + assertEquals(0, rmAdminCLI.run(args)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index 9d9c018a4696..b379ed62af74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveF import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; @@ -87,6 +88,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueu import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; @@ -127,6 +130,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederation import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationQueuePoliciesRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -439,4 +444,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour } return null; } + + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + DeleteFederationQueuePoliciesRequestProto requestProto = + ((DeleteFederationQueuePoliciesRequestPBImpl) request).getProto(); + try { + return new DeleteFederationQueuePoliciesResponsePBImpl( + proxy.deleteFederationPoliciesByQueues(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index 1e0b09a61956..2cdb936b386f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveF import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesResponseProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; @@ -91,6 +93,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -129,6 +133,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederation import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationQueuePoliciesRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -467,4 +473,21 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou throw new ServiceException(e); } } + + @Override + public DeleteFederationQueuePoliciesResponseProto deleteFederationPoliciesByQueues( + RpcController controller, DeleteFederationQueuePoliciesRequestProto proto) + throws ServiceException { + DeleteFederationQueuePoliciesRequest requet = + new DeleteFederationQueuePoliciesRequestPBImpl(proto); + try { + DeleteFederationQueuePoliciesResponse response = + real.deleteFederationPoliciesByQueues(requet); + return ((DeleteFederationQueuePoliciesResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DeleteFederationQueuePoliciesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DeleteFederationQueuePoliciesRequestPBImpl.java new file mode 100644 index 000000000000..5573b76e604e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DeleteFederationQueuePoliciesRequestPBImpl.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; + +import java.util.ArrayList; +import java.util.List; + +/** + * Protocol buffer based implementation of {@link DeleteFederationQueuePoliciesRequest}. + */ +@Private +@Unstable +public class DeleteFederationQueuePoliciesRequestPBImpl + extends DeleteFederationQueuePoliciesRequest { + + private DeleteFederationQueuePoliciesRequestProto proto = + DeleteFederationQueuePoliciesRequestProto.getDefaultInstance(); + private DeleteFederationQueuePoliciesRequestProto.Builder builder = null; + private boolean viaProto = false; + private List<String> queues = null; + + public DeleteFederationQueuePoliciesRequestPBImpl() { + builder = DeleteFederationQueuePoliciesRequestProto.newBuilder(); + } + + public DeleteFederationQueuePoliciesRequestPBImpl( + DeleteFederationQueuePoliciesRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public DeleteFederationQueuePoliciesRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.queues != null) { + addQueuesToProto(); + } + } + + private void addQueuesToProto() { + maybeInitBuilder(); + builder.clearQueues(); + if (this.queues == null) { + return; + } + builder.addAllQueues(this.queues); + } + + private void initQueues() { + if (this.queues != null) { + return; + } + DeleteFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder; + List<String> list = p.getQueuesList(); + this.queues = new ArrayList<>(); + this.queues.addAll(list); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public List<String> getQueues() { + if (this.queues != null) { + return this.queues; + } + initQueues(); + return this.queues; + } + + @Override + public void setQueues(List<String> pQueues) { + if (pQueues == null || pQueues.isEmpty()) { + maybeInitBuilder(); + if (this.queues != null) { + this.queues.clear(); + } + return; + } + if (this.queues == null) { + this.queues = new ArrayList<>(); + } + this.queues.clear(); + this.queues.addAll(pQueues); + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = DeleteFederationQueuePoliciesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DeleteFederationQueuePoliciesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DeleteFederationQueuePoliciesResponsePBImpl.java new file mode 100644 index 000000000000..f2226622989d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DeleteFederationQueuePoliciesResponsePBImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationQueuePoliciesResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; + +/** + * Protocol buffer based implementation of {@link DeleteFederationQueuePoliciesResponse}. + */ +@Private +@Unstable +public class DeleteFederationQueuePoliciesResponsePBImpl + extends DeleteFederationQueuePoliciesResponse { + + private DeleteFederationQueuePoliciesResponseProto proto = + DeleteFederationQueuePoliciesResponseProto.getDefaultInstance(); + private DeleteFederationQueuePoliciesResponseProto.Builder builder = null; + private boolean viaProto = false; + + public DeleteFederationQueuePoliciesResponsePBImpl() { + builder = DeleteFederationQueuePoliciesResponseProto.newBuilder(); + } + + public DeleteFederationQueuePoliciesResponsePBImpl( + DeleteFederationQueuePoliciesResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public DeleteFederationQueuePoliciesResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof DeleteFederationQueuePoliciesResponse)) { + return false; + } + DeleteFederationQueuePoliciesResponsePBImpl otherImpl = this.getClass().cast(other); + return new EqualsBuilder().append(this.getProto(), otherImpl.getProto()).isEquals(); + } + + @Override + public String getMessage() { + DeleteFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasMessage = p.hasMessage(); + if (hasMessage) { + return p.getMessage(); + } + return null; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = DeleteFederationQueuePoliciesResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public void setMessage(String msg) { + maybeInitBuilder(); + if (msg == null) { + builder.clearMessage(); + return; + } + builder.setMessage(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java index 0da8578ed47a..e6ab45d0f838 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java @@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse; @@ -76,6 +78,16 @@ public interface FederationPolicyStore { GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( GetSubClusterPoliciesConfigurationsRequest request) throws YarnException; + /** + * Delete PoliciesConfigurations. + * + * @param request List containing delete queues. + * @return response empty means the queue list has been deleted successfully. + * @throws YarnException if the request is invalid/fails + */ + DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations( + DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException; + /** * Delete all queue-to-policy configurations. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 78c7b31f4df4..f4de70d8ffd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -96,6 +96,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretMa import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; @@ -402,6 +404,19 @@ public class MemoryFederationStateStore implements FederationStateStore { return GetSubClusterPoliciesConfigurationsResponse.newInstance(result); } + @Override + public DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations( + DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException { + FederationPolicyStoreInputValidator.validate(request); + for (String queue : request.getQueues()) { + if (policies.containsKey(queue)) { + policies.remove(queue); + LOG.info("The queue = {} policy has been deleted.", queue); + } + } + return DeleteSubClusterPoliciesConfigurationsResponse.newInstance(); + } + @Override public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations( DeletePoliciesConfigurationsRequest request) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 2be810725645..14861539f16e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -100,6 +100,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenReque import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; @@ -1073,6 +1075,34 @@ public class SQLFederationStateStore implements FederationStateStore { return GetSubClusterPoliciesConfigurationsResponse.newInstance(policyConfigurations); } + @Override + public DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations( + DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException { + FederationPolicyStoreInputValidator.validate(request); + Connection connection = null; + try { + connection = getConnection(false); + FederationQueryRunner runner = new FederationQueryRunner(); + for (String queue : request.getQueues()) { + LOG.info("delete queue = {} policy start.", queue); + runner.deletePolicyByQueue(connection, queue); + LOG.info("delete queue = {} policy finished.", queue); + } + return DeleteSubClusterPoliciesConfigurationsResponse.newInstance(); + } catch (Exception e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Could not delete queue policy!", e); + } finally { + // Return to the pool the CallableStatement + try { + FederationStateStoreUtils.returnToPool(LOG, null, connection); + } catch (YarnException e) { + LOG.error("close connection error.", e); + } + } + return null; + } + @Override public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations( DeletePoliciesConfigurationsRequest request) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 2ae05eb865cd..63b853e82ab6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRes import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl; @@ -788,6 +790,39 @@ public class ZookeeperFederationStateStore implements FederationStateStore { return GetSubClusterPoliciesConfigurationsResponse.newInstance(result); } + @Override + public DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations( + DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException { + FederationPolicyStoreInputValidator.validate(request); + List<String> queues = request.getQueues(); + for (String queue : queues) { + deletePolicyConfigurationByQueue(queue); + } + return DeleteSubClusterPoliciesConfigurationsResponse.newInstance(); + } + + private void deletePolicyConfigurationByQueue(String queue) { + String policyZNode = getNodePath(policiesZNode, queue); + + boolean exists = false; + try { + exists = zkManager.exists(policyZNode); + } catch (Exception e) { + LOG.error("An error occurred when checking whether the queue = {} policy exists.", queue, e); + } + + if (!exists) { + LOG.error("The policy of the queue = {} does not exist.", queue); + return; + } + + try { + zkManager.delete(policyZNode); + } catch (Exception e) { + LOG.error("Queue {} policy cannot be deleted.", queue, e); + } + } + @Override public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations( DeletePoliciesConfigurationsRequest request) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/DeleteSubClusterPoliciesConfigurationsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/DeleteSubClusterPoliciesConfigurationsRequest.java new file mode 100644 index 000000000000..0bcd8a91004f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/DeleteSubClusterPoliciesConfigurationsRequest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * This class is used to respond to queue deletion requests and contains a list of queues. + */ +@Private +@Unstable +public abstract class DeleteSubClusterPoliciesConfigurationsRequest { + + @Private + @Unstable + public static DeleteSubClusterPoliciesConfigurationsRequest newInstance( + List<String> queues) { + DeleteSubClusterPoliciesConfigurationsRequest request = + Records.newRecord(DeleteSubClusterPoliciesConfigurationsRequest.class); + request.setQueues(queues); + return request; + } + + /** + * To obtain the list of queues to be deleted. + * + * @return list of queue names. + */ + @Public + @Unstable + public abstract List<String> getQueues(); + + /** + * Set the list of queues to be deleted. + * + * @param queues list of queue names. + */ + @Private + @Unstable + public abstract void setQueues(List<String> queues); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/DeleteSubClusterPoliciesConfigurationsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/DeleteSubClusterPoliciesConfigurationsResponse.java new file mode 100644 index 000000000000..10d2df58b042 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/DeleteSubClusterPoliciesConfigurationsResponse.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * DeleteSubClusterPoliciesConfigurationsResponse contains the answer from the + * {@code FederationPolicyStore} to a request to delete policy configurations + * for given queues. + */ +@Private +@Unstable +public abstract class DeleteSubClusterPoliciesConfigurationsResponse { + public static DeleteSubClusterPoliciesConfigurationsResponse newInstance() { + return Records.newRecord(DeleteSubClusterPoliciesConfigurationsResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/DeleteSubClusterPoliciesConfigurationsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/DeleteSubClusterPoliciesConfigurationsRequestPBImpl.java new file mode 100644 index 000000000000..e73fe87ada57 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/DeleteSubClusterPoliciesConfigurationsRequestPBImpl.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeleteSubClusterPoliciesConfigurationsRequestProtoOrBuilder; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeleteSubClusterPoliciesConfigurationsRequestProto; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; + +import java.util.ArrayList; +import java.util.List; + +/** + * Protocol buffer based implementation of {@link DeleteSubClusterPoliciesConfigurationsRequest}. + */ +@Private +@Unstable +public class DeleteSubClusterPoliciesConfigurationsRequestPBImpl extends + DeleteSubClusterPoliciesConfigurationsRequest { + + private DeleteSubClusterPoliciesConfigurationsRequestProto proto = + DeleteSubClusterPoliciesConfigurationsRequestProto.getDefaultInstance(); + private DeleteSubClusterPoliciesConfigurationsRequestProto.Builder builder = null; + private boolean viaProto = false; + private List<String> queues = null; + + public DeleteSubClusterPoliciesConfigurationsRequestPBImpl() { + builder = DeleteSubClusterPoliciesConfigurationsRequestProto.newBuilder(); + } + + public DeleteSubClusterPoliciesConfigurationsRequestPBImpl( + DeleteSubClusterPoliciesConfigurationsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public DeleteSubClusterPoliciesConfigurationsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.queues != null) { + addQueuesToProto(); + } + } + + private void addQueuesToProto() { + maybeInitBuilder(); + builder.clearQueues(); + if (this.queues == null) { + return; + } + builder.addAllQueues(this.queues); + } + + private void initQueues() { + if (this.queues != null) { + return; + } + DeleteSubClusterPoliciesConfigurationsRequestProtoOrBuilder p = viaProto ? proto : builder; + List<String> list = p.getQueuesList(); + this.queues = new ArrayList<>(); + this.queues.addAll(list); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public List<String> getQueues() { + if (this.queues != null) { + return this.queues; + } + initQueues(); + return this.queues; + } + + @Override + public void setQueues(List<String> pQueues) { + if (pQueues == null || pQueues.isEmpty()) { + maybeInitBuilder(); + if (this.queues != null) { + this.queues.clear(); + } + return; + } + if (this.queues == null) { + this.queues = new ArrayList<>(); + } + this.queues.clear(); + this.queues.addAll(pQueues); + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = DeleteSubClusterPoliciesConfigurationsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/DeleteSubClusterPoliciesConfigurationsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/DeleteSubClusterPoliciesConfigurationsResponsePBImpl.java new file mode 100644 index 000000000000..dabeea8b27ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/DeleteSubClusterPoliciesConfigurationsResponsePBImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeleteSubClusterPoliciesConfigurationsResponseProto; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse; + +/** + * Protocol buffer based implementation of + * {@link DeleteSubClusterPoliciesConfigurationsResponse}. + */ +@Private +@Unstable +public class DeleteSubClusterPoliciesConfigurationsResponsePBImpl + extends DeleteSubClusterPoliciesConfigurationsResponse { + + private DeleteSubClusterPoliciesConfigurationsResponseProto proto = + DeleteSubClusterPoliciesConfigurationsResponseProto.getDefaultInstance(); + private DeleteSubClusterPoliciesConfigurationsResponseProto.Builder builder = null; + private boolean viaProto = false; + + public DeleteSubClusterPoliciesConfigurationsResponsePBImpl() { + builder = DeleteSubClusterPoliciesConfigurationsResponseProto.newBuilder(); + } + + public DeleteSubClusterPoliciesConfigurationsResponsePBImpl( + DeleteSubClusterPoliciesConfigurationsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public DeleteSubClusterPoliciesConfigurationsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java index 99ab7b2e1f71..4ff56eef01eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java @@ -45,12 +45,14 @@ public class FederationQueryRunner { public final static String QUERY_SEQUENCE_TABLE_SQL = "SELECT nextVal FROM sequenceTable WHERE sequenceName = %s"; - public final static String INSERT_SEQUENCE_TABLE_SQL = "" + + public final static String INSERT_SEQUENCE_TABLE_SQL = "INSERT INTO sequenceTable(sequenceName, nextVal) VALUES(%s, %d)"; - public final static String UPDATE_SEQUENCE_TABLE_SQL = "" + + public final static String UPDATE_SEQUENCE_TABLE_SQL = "UPDATE sequenceTable SET nextVal = %d WHERE sequenceName = %s"; + public final static String DELETE_QUEUE_SQL = "DELETE FROM policies WHERE queue = %s"; + public static final Logger LOG = LoggerFactory.getLogger(FederationQueryRunner.class); /** @@ -294,6 +296,26 @@ public class FederationQueryRunner { } } + public void deletePolicyByQueue(Connection connection, String queue) + throws SQLException { + String deleteSQL = String.format(DELETE_QUEUE_SQL, quoteString(queue)); + boolean committed = false; + Statement statement = null; + try { + statement = connection.createStatement(); + statement.executeUpdate(deleteSQL); + connection.commit(); + committed = true; + } catch (SQLException e) { + throw new SQLException("Unable to deletePolicyByQueue due to: " + e.getMessage()); + } finally { + if (!committed) { + rollbackDBConn(connection); + } + close(statement); + } + } + public void truncateTable(Connection connection, String tableName) throws SQLException { DbType dbType = DatabaseProduct.getDbType(connection); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java index 3c68bfdace7c..d56efb4dcecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java @@ -18,13 +18,17 @@ package org.apache.hadoop.yarn.server.federation.store.utils; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** * Utility class to validate the inputs to {@code FederationPolicyStore}, allows * a fail fast mechanism for invalid user inputs. @@ -140,4 +144,19 @@ public final class FederationPolicyStoreInputValidator { } } + public static void validate(DeleteSubClusterPoliciesConfigurationsRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing DeleteSubClusterPoliciesConfigurationsRequest Request." + + " Please try again by specifying an policy insertion information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + List<String> queues = request.getQueues(); + if (CollectionUtils.isEmpty(queues)) { + throw new FederationStateStoreInvalidInputException( + "The queues that needs to be deleted cannot be empty."); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index c0026d5222d4..c0741a6c625b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregist import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -1128,4 +1129,13 @@ public final class FederationStateStoreFacade { public void deleteStore() throws Exception { stateStore.deleteStateStore(); } + + public void deletePolicyConfigurations(List<String> queuesList) throws YarnException { + if (CollectionUtils.isEmpty(queuesList)) { + throw new YarnException("queuesList cannot be empty!"); + } + DeleteSubClusterPoliciesConfigurationsRequest request = + DeleteSubClusterPoliciesConfigurationsRequest.newInstance(queuesList); + stateStore.deletePoliciesConfigurations(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto index 9e2316dbbd48..5664a23ebf68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto @@ -196,6 +196,13 @@ message DeleteReservationHomeSubClusterRequestProto { message DeleteReservationHomeSubClusterResponseProto { } +message DeleteSubClusterPoliciesConfigurationsRequestProto { + repeated string queues = 1; +} + +message DeleteSubClusterPoliciesConfigurationsResponseProto { +} + message DeletePoliciesConfigurationsRequestProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 0823660bc040..9ce84356d6bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -183,6 +183,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; /** @@ -996,6 +998,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, return null; } + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + return null; + } + @VisibleForTesting public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() { return applicationContainerIdMap; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 1d7cca651b5e..a0073e0575c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.sql.SQLException; import java.util.Calendar; import java.util.List; +import java.util.ArrayList; import java.util.Set; import java.util.HashSet; import java.util.TimeZone; @@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConf import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; @@ -1140,6 +1142,54 @@ public abstract class FederationStateStoreBaseTest { assertEquals(0, appsHomeSubClusters.size()); } + @Test + public void testDeletePoliciesConfigurations() throws Exception { + + // Step1. We initialize the policy of the queue + FederationStateStore federationStateStore = this.getStateStore(); + setPolicyConf("Queue1", "PolicyType1"); + setPolicyConf("Queue2", "PolicyType2"); + setPolicyConf("Queue3", "PolicyType3"); + + List<String> queues = new ArrayList<>(); + queues.add("Queue1"); + queues.add("Queue2"); + queues.add("Queue3"); + + GetSubClusterPoliciesConfigurationsRequest policyRequest = + GetSubClusterPoliciesConfigurationsRequest.newInstance(); + GetSubClusterPoliciesConfigurationsResponse response = + stateStore.getPoliciesConfigurations(policyRequest); + + // Step2. Confirm that the initialized queue policy meets expectations. + Assert.assertNotNull(response); + List<SubClusterPolicyConfiguration> policiesConfigs = response.getPoliciesConfigs(); + for (SubClusterPolicyConfiguration policyConfig : policiesConfigs) { + Assert.assertTrue(queues.contains(policyConfig.getQueue())); + } + + // Step3. Delete the policy of queue (Queue1, Queue2). + List<String> deleteQueues = new ArrayList<>(); + deleteQueues.add("Queue1"); + deleteQueues.add("Queue2"); + DeleteSubClusterPoliciesConfigurationsRequest deleteRequest = + DeleteSubClusterPoliciesConfigurationsRequest.newInstance(deleteQueues); + federationStateStore.deletePoliciesConfigurations(deleteRequest); + + // Step4. Confirm that the queue has been deleted, + // that is, all currently returned queues do not exist in the deletion list. + GetSubClusterPoliciesConfigurationsRequest policyRequest2 = + GetSubClusterPoliciesConfigurationsRequest.newInstance(); + GetSubClusterPoliciesConfigurationsResponse response2 = + stateStore.getPoliciesConfigurations(policyRequest2); + Assert.assertNotNull(response2); + List<SubClusterPolicyConfiguration> policiesConfigs2 = response2.getPoliciesConfigs(); + for (SubClusterPolicyConfiguration policyConfig : policiesConfigs2) { + Assert.assertFalse(deleteQueues.contains(policyConfig.getQueue())); + } + } + + @Test public void testDeletePolicyStore() throws Exception { // Step1. We delete all Policies Configurations. FederationStateStore federationStateStore = this.getStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index c45c34dc85c8..72bfde4e81d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; @@ -1115,6 +1117,14 @@ public class AdminService extends CompositeService implements " Please call Router's deleteFederationApplication to delete Application."); } + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + throw new YarnException("It is not allowed to call the RM's " + + " deleteFederationQueuePoliciesByQueues. " + + " Please call Router's deleteFederationQueuePoliciesByQueues to delete Policies."); + } + private void validateAttributesExists( List<NodeToAttributes> nodesToAttributes) throws IOException { NodeAttributesManager nodeAttributesManager = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index bbb6687b054d..6384736d62e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -78,6 +78,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationH import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; @@ -324,6 +326,16 @@ public class FederationStateStoreService extends AbstractService return clientMethod.invoke(); } + @Override + public DeleteSubClusterPoliciesConfigurationsResponse deletePoliciesConfigurations( + DeleteSubClusterPoliciesConfigurationsRequest request) throws YarnException { + FederationClientMethod<DeleteSubClusterPoliciesConfigurationsResponse> clientMethod = + new FederationClientMethod<>("deletePoliciesConfigurations", + DeleteSubClusterPoliciesConfigurationsRequest.class, request, + DeleteSubClusterPoliciesConfigurationsResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); + } + @Override public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations( DeletePoliciesConfigurationsRequest request) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 66157797a936..e1ff6a87d7bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -157,6 +157,8 @@ public final class RouterMetrics { private MutableGaugeInt numListFederationQueuePoliciesFailedRetrieved; @Metric("# of deleteFederationApplication failed to be retrieved") private MutableGaugeInt numDeleteFederationApplicationFailedRetrieved; + @Metric("# of deleteFederationPoliciesByQueues failed to be retrieved") + private MutableGaugeInt numDeleteFederationPoliciesByQueuesRetrieved; @Metric("# of refreshAdminAcls failed to be retrieved") private MutableGaugeInt numRefreshAdminAclsFailedRetrieved; @Metric("# of refreshServiceAcls failed to be retrieved") @@ -311,6 +313,8 @@ public final class RouterMetrics { private MutableRate totalSucceededListFederationQueuePoliciesFailedRetrieved; @Metric("Total number of successful Retrieved DeleteFederationApplication and latency(ms)") private MutableRate totalSucceededDeleteFederationApplicationFailedRetrieved; + @Metric("Total number of successful Retrieved DeleteFederationPoliciesByQueues and latency(ms)") + private MutableRate totalSucceededDeleteFederationPoliciesByQueuesRetrieved; @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)") private MutableRate totalSucceededRefreshAdminAclsRetrieved; @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)") @@ -401,6 +405,7 @@ public final class RouterMetrics { private MutableQuantiles batchSaveFederationQueuePoliciesLatency; private MutableQuantiles listFederationQueuePoliciesLatency; private MutableQuantiles deleteFederationApplicationLatency; + private MutableQuantiles deleteFederationPoliciesByQueuesLatency; private MutableQuantiles refreshAdminAclsLatency; private MutableQuantiles refreshServiceAclsLatency; private MutableQuantiles replaceLabelsOnNodesLatency; @@ -627,6 +632,10 @@ public final class RouterMetrics { "deleteFederationApplicationLatency", "latency of delete FederationApplication timeouts", "ops", "latency", 10); + deleteFederationPoliciesByQueuesLatency = registry.newQuantiles( + "deleteFederationPoliciesByQueuesLatency", + "latency of delete FederationPoliciesByQueues timeouts", "ops", "latency", 10); + refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency", "latency of refresh admin acls timeouts", "ops", "latency", 10); @@ -976,6 +985,11 @@ public final class RouterMetrics { return totalSucceededDeleteFederationApplicationFailedRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededDeleteFederationPoliciesByQueuesRetrieved() { + return totalSucceededDeleteFederationPoliciesByQueuesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples(); @@ -1341,6 +1355,11 @@ public final class RouterMetrics { return totalSucceededDeleteFederationApplicationFailedRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededDeleteFederationPoliciesByQueuesRetrieved() { + return totalSucceededDeleteFederationPoliciesByQueuesRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean(); @@ -1652,6 +1671,10 @@ public final class RouterMetrics { return numDeleteFederationApplicationFailedRetrieved.value(); } + public int getDeleteFederationPoliciesByQueuesRetrieved() { + return numDeleteFederationPoliciesByQueuesRetrieved.value(); + } + public int getNumRefreshAdminAclsFailedRetrieved() { return numRefreshAdminAclsFailedRetrieved.value(); } @@ -2024,6 +2047,11 @@ public final class RouterMetrics { deleteFederationApplicationLatency.add(duration); } + public void succeededDeleteFederationPoliciesByQueuesRetrieved(long duration) { + totalSucceededDeleteFederationPoliciesByQueuesRetrieved.add(duration); + deleteFederationPoliciesByQueuesLatency.add(duration); + } + public void succeededRefreshAdminAclsRetrieved(long duration) { totalSucceededRefreshAdminAclsRetrieved.add(duration); refreshAdminAclsLatency.add(duration); @@ -2318,6 +2346,10 @@ public final class RouterMetrics { numDeleteFederationApplicationFailedRetrieved.incr(); } + public void incrDeleteFederationPoliciesByQueuesRetrieved() { + numDeleteFederationPoliciesByQueuesRetrieved.incr(); + } + public void incrRefreshAdminAclsFailedRetrieved() { numRefreshAdminAclsFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java index 2633a90d62cc..64fcd70cd1f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java @@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -242,4 +244,10 @@ public class DefaultRMAdminRequestInterceptor throws YarnException, IOException { return rmAdminProxy.deleteFederationApplication(request); } + + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + return rmAdminProxy.deleteFederationPoliciesByQueues(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 230308e361ff..137cfa626be0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueu import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; @@ -1140,6 +1142,46 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep throw new YarnException("Unable to deleteFederationApplication."); } + /** + * Delete Policies based on the provided queue list. + * + * @param request DeleteFederationQueuePoliciesRequest Request. + * @return If the deletion is successful, the queue deletion success message will be returned. + * @throws YarnException indicates exceptions from yarn servers. + * @throws IOException io error occurs. + */ + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + + // Parameter validation. + if (request == null) { + routerMetrics.incrDeleteFederationPoliciesByQueuesRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing deleteFederationQueuePoliciesByQueues Request.", null); + } + + List<String> queues = request.getQueues(); + if (CollectionUtils.isEmpty(queues)) { + routerMetrics.incrDeleteFederationPoliciesByQueuesRetrieved(); + RouterServerUtil.logAndThrowException("queues cannot be null.", null); + } + + // Try calling deleteApplicationHomeSubCluster to delete the application. + try { + long startTime = clock.getTime(); + federationFacade.deletePolicyConfigurations(queues); + long stopTime = clock.getTime(); + routerMetrics.succeededDeleteFederationPoliciesByQueuesRetrieved(stopTime - startTime); + return DeleteFederationQueuePoliciesResponse.newInstance( + "queues = " + StringUtils.join(queues, ",") + " delete success."); + } catch (Exception e) { + RouterServerUtil.logAndThrowException(e, + "Unable to deleteFederationPoliciesByQueues due to exception. " + e.getMessage()); + } + throw new YarnException("Unable to deleteFederationPoliciesByQueues."); + } + /** * According to the configuration information of the queue filtering queue, * this part should only return 1 result. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index 718abd289449..d9e9b10d871d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; @@ -428,4 +430,11 @@ public class RouterRMAdminService extends AbstractService RequestInterceptorChainWrapper pipeline = getInterceptorChain(); return pipeline.getRootInterceptor().deleteFederationApplication(request); } + + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().deleteFederationPoliciesByQueues(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 60d5e5303c7f..61542eba5d86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -638,6 +638,11 @@ public class TestRouterMetrics { LOG.info("Mocked: failed ListFederationQueuePolicies call"); metrics.incrListFederationQueuePoliciesFailedRetrieved(); } + + public void getDeleteFederationPoliciesByQueuesFailedRetrieved() { + LOG.info("Mocked: failed DeleteFederationPoliciesByQueues call"); + metrics.incrDeleteFederationPoliciesByQueuesRetrieved(); + } } // Records successes for all calls @@ -985,6 +990,12 @@ public class TestRouterMetrics { " call with duration {}", duration); metrics.succeededListFederationQueuePoliciesRetrieved(duration); } + + public void deleteFederationPoliciesByQueuesRetrieved(long duration) { + LOG.info("Mocked: successful DeleteFederationPoliciesByQueuesRetrieved " + + " call with duration {}", duration); + metrics.succeededDeleteFederationPoliciesByQueuesRetrieved(duration); + } } @Test @@ -2311,4 +2322,29 @@ public class TestRouterMetrics { Assert.assertEquals(225, metrics.getLatencySucceededListFederationQueuePoliciesRetrieved(), ASSERT_DOUBLE_DELTA); } + + @Test + public void testDeleteFederationPoliciesByQueuesFailedRetrieved() { + long totalBadBefore = metrics.getDeleteFederationPoliciesByQueuesRetrieved(); + badSubCluster.getDeleteFederationPoliciesByQueuesFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getDeleteFederationPoliciesByQueuesRetrieved()); + } + + @Test + public void testDeleteFederationPoliciesByQueuesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededDeleteFederationPoliciesByQueuesRetrieved(); + goodSubCluster.deleteFederationPoliciesByQueuesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededDeleteFederationPoliciesByQueuesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededDeleteFederationPoliciesByQueuesRetrieved(), + ASSERT_DOUBLE_DELTA); + goodSubCluster.deleteFederationPoliciesByQueuesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededDeleteFederationPoliciesByQueuesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededDeleteFederationPoliciesByQueuesRetrieved(), + ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java index 0dab931a7ab9..43c925946da0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; /** * Mock interceptor that does not do anything other than forwarding it to the @@ -193,4 +195,10 @@ public class PassThroughRMAdminRequestInterceptor DeleteFederationApplicationRequest request) throws YarnException, IOException { return getNextInterceptor().deleteFederationApplication(request); } + + @Override + public DeleteFederationQueuePoliciesResponse deleteFederationPoliciesByQueues( + DeleteFederationQueuePoliciesRequest request) throws YarnException, IOException { + return getNextInterceptor().deleteFederationPoliciesByQueues(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index a1339e419040..8d33b044e3e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -67,6 +67,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; @@ -1006,7 +1008,6 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { () -> interceptor.listFederationQueuePolicies(request8)); } - @Test public void testDeleteFederationApplication() throws Exception { ApplicationId applicationId = ApplicationId.newInstance(10, 1); @@ -1029,4 +1030,44 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { assertEquals("applicationId = " + applicationId2 + " delete success.", deleteFederationApplicationResponse.getMessage()); } + + @Test + public void testDeleteFederationPoliciesByQueues() throws IOException, YarnException { + // subClusters + List<String> subClusterLists = new ArrayList<>(); + subClusterLists.add("SC-1"); + subClusterLists.add("SC-2"); + + // generate queue A, queue B, queue C + FederationQueueWeight rootA = generateFederationQueueWeight("root.a", subClusterLists); + FederationQueueWeight rootB = generateFederationQueueWeight("root.b", subClusterLists); + FederationQueueWeight rootC = generateFederationQueueWeight("root.c", subClusterLists); + + List<FederationQueueWeight> federationQueueWeights = new ArrayList<>(); + federationQueueWeights.add(rootA); + federationQueueWeights.add(rootB); + federationQueueWeights.add(rootC); + + // Step1. Save Queue Policies in Batches + BatchSaveFederationQueuePoliciesRequest request = + BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights); + + BatchSaveFederationQueuePoliciesResponse policiesResponse = + interceptor.batchSaveFederationQueuePolicies(request); + + assertNotNull(policiesResponse); + assertNotNull(policiesResponse.getMessage()); + assertEquals("batch save policies success.", policiesResponse.getMessage()); + + // Step2. Delete the policy of root.c + List<String> deleteQueues = new ArrayList<>(); + deleteQueues.add("root.c"); + DeleteFederationQueuePoliciesRequest deleteRequest = + DeleteFederationQueuePoliciesRequest.newInstance(deleteQueues); + DeleteFederationQueuePoliciesResponse deleteResponse = + interceptor.deleteFederationPoliciesByQueues(deleteRequest); + assertNotNull(deleteResponse); + String message = deleteResponse.getMessage(); + assertEquals("queues = root.c delete success.", message); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org