YARN-5467. InputValidator for the FederationStateStore internal APIs. (Giovanni Matteo Fumarola via Subru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/990c5639 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/990c5639 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/990c5639 Branch: refs/heads/YARN-2915 Commit: 990c5639599deef564892f505b2b476405feb76b Parents: 4a8e2cb Author: Subru Krishnan <su...@apache.org> Authored: Wed Aug 17 12:07:06 2016 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Mon Oct 24 19:21:36 2016 -0700 ---------------------------------------------------------------------- .../store/impl/MemoryFederationStateStore.java | 30 + ...cationHomeSubClusterStoreInputValidator.java | 183 +++ ...ationMembershipStateStoreInputValidator.java | 317 +++++ .../FederationPolicyStoreInputValidator.java | 144 ++ ...derationStateStoreInvalidInputException.java | 48 + .../federation/store/utils/package-info.java | 17 + .../impl/FederationStateStoreBaseTest.java | 6 +- .../TestFederationStateStoreInputValidator.java | 1265 ++++++++++++++++++ 8 files changed, 2007 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 8144435..6e564dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -57,6 +57,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegister import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -88,6 +91,8 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public SubClusterRegisterResponse registerSubCluster( SubClusterRegisterRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator + .validateSubClusterRegisterRequest(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); membership.put(subClusterInfo.getSubClusterId(), subClusterInfo); return SubClusterRegisterResponse.newInstance(); @@ -96,6 +101,8 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public SubClusterDeregisterResponse deregisterSubCluster( SubClusterDeregisterRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator + .validateSubClusterDeregisterRequest(request); SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); if (subClusterInfo == null) { throw new YarnException( @@ -111,6 +118,8 @@ public class MemoryFederationStateStore implements FederationStateStore { public SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterHeartbeatRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator + .validateSubClusterHeartbeatRequest(request); SubClusterId subClusterId = request.getSubClusterId(); SubClusterInfo subClusterInfo = membership.get(subClusterId); @@ -129,6 +138,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public GetSubClusterInfoResponse getSubCluster( GetSubClusterInfoRequest request) throws YarnException { + + FederationMembershipStateStoreInputValidator + .validateGetSubClusterInfoRequest(request); SubClusterId subClusterId = request.getSubClusterId(); if (!membership.containsKey(subClusterId)) { throw new YarnException( @@ -157,6 +169,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator + .validateAddApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); @@ -172,6 +187,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( UpdateApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator + .validateUpdateApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); if (!applications.containsKey(appId)) { @@ -186,6 +204,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( GetApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator + .validateGetApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { throw new YarnException("Application " + appId + " does not exist"); @@ -212,6 +233,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator + .validateDeleteApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { throw new YarnException("Application " + appId + " does not exist"); @@ -224,6 +248,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( GetSubClusterPolicyConfigurationRequest request) throws YarnException { + + FederationPolicyStoreInputValidator + .validateGetSubClusterPolicyConfigurationRequest(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { throw new YarnException("Policy for queue " + queue + " does not exist"); @@ -236,6 +263,9 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( SetSubClusterPolicyConfigurationRequest request) throws YarnException { + + FederationPolicyStoreInputValidator + .validateSetSubClusterPolicyConfigurationRequest(request); policies.put(request.getPolicyConfiguration().getQueue(), request.getPolicyConfiguration()); return SetSubClusterPolicyConfigurationResponse.newInstance(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java new file mode 100644 index 0000000..c14a452 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to validate the inputs to + * {@code FederationApplicationHomeSubClusterStore}, allows a fail fast + * mechanism for invalid user inputs. + * + */ +public final class FederationApplicationHomeSubClusterStoreInputValidator { + + private static final Logger LOG = LoggerFactory + .getLogger(FederationApplicationHomeSubClusterStoreInputValidator.class); + + private FederationApplicationHomeSubClusterStoreInputValidator() { + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link AddApplicationHomeSubClusterRequest} + * for adding a new application is valid or not. + * + * @param request the {@link AddApplicationHomeSubClusterRequest} to validate + * against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateAddApplicationHomeSubClusterRequest( + AddApplicationHomeSubClusterRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing AddApplicationHomeSubCluster Request." + + " Please try again by specifying" + + " an AddApplicationHomeSubCluster information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate ApplicationHomeSubCluster info + checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster()); + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link UpdateApplicationHomeSubClusterRequest} + * for updating an application is valid or not. + * + * @param request the {@link UpdateApplicationHomeSubClusterRequest} to + * validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateUpdateApplicationHomeSubClusterRequest( + UpdateApplicationHomeSubClusterRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing UpdateApplicationHomeSubCluster Request." + + " Please try again by specifying" + + " an ApplicationHomeSubCluster information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate ApplicationHomeSubCluster info + checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster()); + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link GetApplicationHomeSubClusterRequest} + * for querying application's information is valid or not. + * + * @param request the {@link GetApplicationHomeSubClusterRequest} to validate + * against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateGetApplicationHomeSubClusterRequest( + GetApplicationHomeSubClusterRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing GetApplicationHomeSubCluster Request." + + " Please try again by specifying an Application Id information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate application Id + checkApplicationId(request.getApplicationId()); + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link DeleteApplicationHomeSubClusterRequest} + * for deleting an application is valid or not. + * + * @param request the {@link DeleteApplicationHomeSubClusterRequest} to + * validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateDeleteApplicationHomeSubClusterRequest( + DeleteApplicationHomeSubClusterRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing DeleteApplicationHomeSubCluster Request." + + " Please try again by specifying" + + " an ApplicationHomeSubCluster information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate application Id + checkApplicationId(request.getApplicationId()); + } + + /** + * Validate if the ApplicationHomeSubCluster info are present or not. + * + * @param applicationHomeSubCluster the information of the application to be + * verified + * @throws FederationStateStoreInvalidInputException if the SubCluster Info + * are invalid + */ + private static void checkApplicationHomeSubCluster( + ApplicationHomeSubCluster applicationHomeSubCluster) + + throws FederationStateStoreInvalidInputException { + if (applicationHomeSubCluster == null) { + String message = "Missing ApplicationHomeSubCluster Info." + + " Please try again by specifying" + + " an ApplicationHomeSubCluster information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + // validate application Id + checkApplicationId(applicationHomeSubCluster.getApplicationId()); + + // validate subcluster Id + FederationMembershipStateStoreInputValidator + .checkSubClusterId(applicationHomeSubCluster.getHomeSubCluster()); + + } + + /** + * Validate if the application id is present or not. + * + * @param appId the id of the application to be verified + * @throws FederationStateStoreInvalidInputException if the application Id is + * invalid + */ + private static void checkApplicationId(ApplicationId appId) + throws FederationStateStoreInvalidInputException { + if (appId == null) { + String message = "Missing Application Id." + + " Please try again by specifying an Application Id."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java new file mode 100644 index 0000000..b587ee5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import java.net.URI; + +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to validate the inputs to + * {@code FederationMembershipStateStore}, allows a fail fast mechanism for + * invalid user inputs. + * + */ +public final class FederationMembershipStateStoreInputValidator { + + private static final Logger LOG = LoggerFactory + .getLogger(FederationMembershipStateStoreInputValidator.class); + + private FederationMembershipStateStoreInputValidator() { + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link SubClusterRegisterRequest} for + * registration a new subcluster is valid or not. + * + * @param request the {@link SubClusterRegisterRequest} to validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateSubClusterRegisterRequest( + SubClusterRegisterRequest request) + throws FederationStateStoreInvalidInputException { + + // check if the request is present + if (request == null) { + String message = "Missing SubClusterRegister Request." + + " Please try again by specifying a" + + " SubCluster Register Information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + + } + + // validate subcluster info + checkSubClusterInfo(request.getSubClusterInfo()); + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link SubClusterDeregisterRequest} for + * deregistration a subcluster is valid or not. + * + * @param request the {@link SubClusterDeregisterRequest} to validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateSubClusterDeregisterRequest( + SubClusterDeregisterRequest request) + throws FederationStateStoreInvalidInputException { + + // check if the request is present + if (request == null) { + String message = "Missing SubClusterDeregister Request." + + " Please try again by specifying a" + + " SubCluster Deregister Information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate subcluster id + checkSubClusterId(request.getSubClusterId()); + // validate subcluster state + checkSubClusterState(request.getState()); + if (!request.getState().isFinal()) { + String message = "Invalid non-final state: " + request.getState(); + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link SubClusterHeartbeatRequest} for + * heartbeating a subcluster is valid or not. + * + * @param request the {@link SubClusterHeartbeatRequest} to validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateSubClusterHeartbeatRequest( + SubClusterHeartbeatRequest request) + throws FederationStateStoreInvalidInputException { + + // check if the request is present + if (request == null) { + String message = "Missing SubClusterHeartbeat Request." + + " Please try again by specifying a" + + " SubCluster Heartbeat Information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate subcluster id + checkSubClusterId(request.getSubClusterId()); + // validate last heartbeat timestamp + checkTimestamp(request.getLastHeartBeat()); + // validate subcluster capability + checkCapability(request.getCapability()); + // validate subcluster state + checkSubClusterState(request.getState()); + + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided {@link GetSubClusterInfoRequest} for querying + * subcluster's information is valid or not. + * + * @param request the {@link GetSubClusterInfoRequest} to validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateGetSubClusterInfoRequest( + GetSubClusterInfoRequest request) + throws FederationStateStoreInvalidInputException { + + // check if the request is present + if (request == null) { + String message = "Missing GetSubClusterInfo Request." + + " Please try again by specifying a Get SubCluster information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate subcluster id + checkSubClusterId(request.getSubClusterId()); + } + + /** + * Validate if the SubCluster Info are present or not. + * + * @param subClusterInfo the information of the subcluster to be verified + * @throws FederationStateStoreInvalidInputException if the SubCluster Info + * are invalid + */ + private static void checkSubClusterInfo(SubClusterInfo subClusterInfo) + throws FederationStateStoreInvalidInputException { + if (subClusterInfo == null) { + String message = "Missing SubCluster Information." + + " Please try again by specifying SubCluster Information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate subcluster id + checkSubClusterId(subClusterInfo.getSubClusterId()); + + // validate AMRM Service address + checkAddress(subClusterInfo.getAMRMServiceAddress()); + // validate ClientRM Service address + checkAddress(subClusterInfo.getClientRMServiceAddress()); + // validate RMClient Service address + checkAddress(subClusterInfo.getRMAdminServiceAddress()); + // validate RMWeb Service address + checkAddress(subClusterInfo.getRMWebServiceAddress()); + + // validate last heartbeat timestamp + checkTimestamp(subClusterInfo.getLastHeartBeat()); + // validate last start timestamp + checkTimestamp(subClusterInfo.getLastStartTime()); + + // validate subcluster state + checkSubClusterState(subClusterInfo.getState()); + + // validate subcluster capability + checkCapability(subClusterInfo.getCapability()); + } + + /** + * Validate if the timestamp is positive or not. + * + * @param timestamp the timestamp to be verified + * @throws FederationStateStoreInvalidInputException if the timestamp is + * invalid + */ + private static void checkTimestamp(long timestamp) + throws FederationStateStoreInvalidInputException { + if (timestamp < 0) { + String message = "Invalid timestamp information." + + " Please try again by specifying valid Timestamp Information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + + /** + * Validate if the Capability is present or not. + * + * @param capability the capability of the subcluster to be verified + * @throws FederationStateStoreInvalidInputException if the capability is + * invalid + */ + private static void checkCapability(String capability) + throws FederationStateStoreInvalidInputException { + if (capability == null || capability.isEmpty()) { + String message = "Invalid capability information." + + " Please try again by specifying valid Capability Information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + + /** + * Validate if the SubCluster Id is present or not. + * + * @param subClusterId the identifier of the subcluster to be verified + * @throws FederationStateStoreInvalidInputException if the SubCluster Id is + * invalid + */ + protected static void checkSubClusterId(SubClusterId subClusterId) + throws FederationStateStoreInvalidInputException { + // check if cluster id is present + if (subClusterId == null) { + String message = "Missing SubCluster Id information." + + " Please try again by specifying Subcluster Id information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + // check if cluster id is valid + if (subClusterId.getId().isEmpty()) { + String message = "Invalid SubCluster Id information." + + " Please try again by specifying valid Subcluster Id."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + + /** + * Validate if the SubCluster Address is a valid URL or not. + * + * @param address the endpoint of the subcluster to be verified + * @throws FederationStateStoreInvalidInputException if the address is invalid + */ + private static void checkAddress(String address) + throws FederationStateStoreInvalidInputException { + // Ensure url is not null + if (address == null || address.isEmpty()) { + String message = "Missing SubCluster Endpoint information." + + " Please try again by specifying SubCluster Endpoint information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + // Validate url is well formed + boolean hasScheme = address.contains("://"); + URI uri = null; + try { + uri = hasScheme ? URI.create(address) + : URI.create("dummyscheme://" + address); + } catch (IllegalArgumentException e) { + String message = "The provided SubCluster Endpoint does not contain a" + + " valid host:port authority: " + address; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + String host = uri.getHost(); + int port = uri.getPort(); + String path = uri.getPath(); + if ((host == null) || (port < 0) + || (!hasScheme && path != null && !path.isEmpty())) { + String message = "The provided SubCluster Endpoint does not contain a" + + " valid host:port authority: " + address; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + + /** + * Validate if the SubCluster State is present or not. + * + * @param state the state of the subcluster to be verified + * @throws FederationStateStoreInvalidInputException if the SubCluster State + * is invalid + */ + private static void checkSubClusterState(SubClusterState state) + throws FederationStateStoreInvalidInputException { + // check sub-cluster state is not empty + if (state == null) { + String message = "Missing SubCluster State information." + + " Please try again by specifying SubCluster State information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java new file mode 100644 index 0000000..273a8ac --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to validate the inputs to {@code FederationPolicyStore}, allows + * a fail fast mechanism for invalid user inputs. + * + */ +public final class FederationPolicyStoreInputValidator { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationPolicyStoreInputValidator.class); + + private FederationPolicyStoreInputValidator() { + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided + * {@link GetSubClusterPolicyConfigurationRequest} for querying policy's + * information is valid or not. + * + * @param request the {@link GetSubClusterPolicyConfigurationRequest} to + * validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateGetSubClusterPolicyConfigurationRequest( + GetSubClusterPolicyConfigurationRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing GetSubClusterPolicyConfiguration Request." + + " Please try again by specifying a policy selection information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate queue id + checkQueue(request.getQueue()); + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast). Check if the provided + * {@link SetSubClusterPolicyConfigurationRequest} for adding a new policy is + * valid or not. + * + * @param request the {@link SetSubClusterPolicyConfigurationRequest} to + * validate against + * @throws FederationStateStoreInvalidInputException if the request is invalid + */ + public static void validateSetSubClusterPolicyConfigurationRequest( + SetSubClusterPolicyConfigurationRequest request) + throws FederationStateStoreInvalidInputException { + if (request == null) { + String message = "Missing SetSubClusterPolicyConfiguration Request." + + " Please try again by specifying an policy insertion information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate subcluster policy configuration + checkSubClusterPolicyConfiguration(request.getPolicyConfiguration()); + } + + /** + * Validate if the SubClusterPolicyConfiguration is valid or not. + * + * @param policyConfiguration the policy information to be verified + * @throws FederationStateStoreInvalidInputException if the policy information + * are invalid + */ + private static void checkSubClusterPolicyConfiguration( + SubClusterPolicyConfiguration policyConfiguration) + throws FederationStateStoreInvalidInputException { + if (policyConfiguration == null) { + String message = "Missing SubClusterPolicyConfiguration." + + " Please try again by specifying a SubClusterPolicyConfiguration."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // validate queue id + checkQueue(policyConfiguration.getQueue()); + // validate policy type + checkType(policyConfiguration.getType()); + + } + + /** + * Validate if the queue id is a valid or not. + * + * @param queue the queue id of the policy to be verified + * @throws FederationStateStoreInvalidInputException if the queue id is + * invalid + */ + private static void checkQueue(String queue) + throws FederationStateStoreInvalidInputException { + if (queue == null || queue.isEmpty()) { + String message = "Missing Queue. Please try again by specifying a Queue."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + + /** + * Validate if the policy type is a valid or not. + * + * @param type the type of the policy to be verified + * @throws FederationStateStoreInvalidInputException if the policy is invalid + */ + private static void checkType(String type) + throws FederationStateStoreInvalidInputException { + if (type == null || type.isEmpty()) { + String message = "Missing Policy Type." + + " Please try again by specifying a Policy Type."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java new file mode 100644 index 0000000..ea1428d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception thrown by the {@link FederationMembershipStateStoreInputValidator}, + * {@link FederationApplicationHomeSubClusterStoreInputValidator}, + * {@link FederationPolicyStoreInputValidator} if the input is invalid. + * + */ +public class FederationStateStoreInvalidInputException extends YarnException { + + /** + * IDE auto-generated. + */ + private static final long serialVersionUID = -7352144682711430801L; + + public FederationStateStoreInvalidInputException(Throwable cause) { + super(cause); + } + + public FederationStateStoreInvalidInputException(String message) { + super(message); + } + + public FederationStateStoreInvalidInputException(String message, + Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java new file mode 100644 index 0000000..f4a9c7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.utils; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/990c5639/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 414696b..63a5b65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -162,9 +162,9 @@ public abstract class FederationStateStoreBaseTest { SubClusterRegisterRequest.newInstance(subClusterInfo2)); stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest - .newInstance(subClusterId1, SubClusterState.SC_RUNNING, "")); - stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest - .newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, "")); + .newInstance(subClusterId1, SubClusterState.SC_RUNNING, "capability")); + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance( + subClusterId2, SubClusterState.SC_UNHEALTHY, "capability")); Assert.assertTrue( stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org