YARN-5602. Utils for Federation State and Policy Store. (Giovanni Matteo Fumarola via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a01b06e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a01b06e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a01b06e Branch: refs/heads/YARN-2915 Commit: 2a01b06e9cbd5fb047707bc6e4ce17aabd4d99ba Parents: 734156b Author: Subru Krishnan <su...@apache.org> Authored: Wed Apr 5 15:02:00 2017 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Wed Jun 7 14:46:41 2017 -0700 ---------------------------------------------------------------------- .../FederationStateStoreErrorCode.java | 105 +++++++++++++ .../FederationStateStoreException.java | 45 ++++++ ...derationStateStoreInvalidInputException.java | 48 ++++++ .../FederationStateStoreRetriableException.java | 44 ++++++ .../store/exception/package-info.java | 17 ++ .../store/impl/MemoryFederationStateStore.java | 56 +++++-- .../store/records/SubClusterInfo.java | 62 ++++++++ .../records/impl/pb/SubClusterInfoPBImpl.java | 16 -- ...cationHomeSubClusterStoreInputValidator.java | 1 + ...ationMembershipStateStoreInputValidator.java | 1 + .../FederationPolicyStoreInputValidator.java | 1 + ...derationStateStoreInvalidInputException.java | 48 ------ .../store/utils/FederationStateStoreUtils.java | 155 +++++++++++++++++++ .../utils/FederationStateStoreFacade.java | 23 ++- .../impl/FederationStateStoreBaseTest.java | 91 ++++++----- .../impl/TestMemoryFederationStateStore.java | 4 +- .../TestFederationStateStoreInputValidator.java | 1 + .../TestFederationStateStoreFacadeRetry.java | 125 +++++++++++++++ 18 files changed, 730 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java new file mode 100644 index 0000000..88e2d3a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.exception; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * <p> + * Logical error codes from <code>FederationStateStore</code>. + * </p> + */ +@Public +@Unstable +public enum FederationStateStoreErrorCode { + + MEMBERSHIP_INSERT_FAIL(1101, "Fail to insert a tuple into Membership table."), + + MEMBERSHIP_DELETE_FAIL(1102, "Fail to delete a tuple from Membership table."), + + MEMBERSHIP_SINGLE_SELECT_FAIL(1103, + "Fail to select a tuple from Membership table."), + + MEMBERSHIP_MULTIPLE_SELECT_FAIL(1104, + "Fail to select multiple tuples from Membership table."), + + MEMBERSHIP_UPDATE_DEREGISTER_FAIL(1105, + "Fail to update/deregister a tuple in Membership table."), + + MEMBERSHIP_UPDATE_HEARTBEAT_FAIL(1106, + "Fail to update/heartbeat a tuple in Membership table."), + + APPLICATIONS_INSERT_FAIL(1201, + "Fail to insert a tuple into ApplicationsHomeSubCluster table."), + + APPLICATIONS_DELETE_FAIL(1202, + "Fail to delete a tuple from ApplicationsHomeSubCluster table"), + + APPLICATIONS_SINGLE_SELECT_FAIL(1203, + "Fail to select a tuple from ApplicationsHomeSubCluster table."), + + APPLICATIONS_MULTIPLE_SELECT_FAIL(1204, + "Fail to select multiple tuple from ApplicationsHomeSubCluster table."), + + APPLICATIONS_UPDATE_FAIL(1205, + "Fail to update a tuple in ApplicationsHomeSubCluster table."), + + POLICY_INSERT_FAIL(1301, "Fail to insert a tuple into Policy table."), + + POLICY_DELETE_FAIL(1302, "Fail to delete a tuple from Membership table."), + + POLICY_SINGLE_SELECT_FAIL(1303, "Fail to select a tuple from Policy table."), + + POLICY_MULTIPLE_SELECT_FAIL(1304, + "Fail to select multiple tuples from Policy table."), + + POLICY_UPDATE_FAIL(1305, "Fail to update a tuple in Policy table."); + + private final int id; + private final String msg; + + FederationStateStoreErrorCode(int id, String msg) { + this.id = id; + this.msg = msg; + } + + /** + * Get the error code related to the FederationStateStore failure. + * + * @return the error code related to the FederationStateStore failure. + */ + public int getId() { + return this.id; + } + + /** + * Get the error message related to the FederationStateStore failure. + * + * @return the error message related to the FederationStateStore failure. + */ + public String getMsg() { + return this.msg; + } + + @Override + public String toString() { + return "\nError Code: " + this.id + "\nError Message: " + this.msg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java new file mode 100644 index 0000000..81a9e99 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.exception; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception thrown by the <code>FederationStateStore</code>. + * + */ +public class FederationStateStoreException extends YarnException { + + /** + * IDE auto-generated. + */ + private static final long serialVersionUID = -6453353714832159296L; + + private FederationStateStoreErrorCode code; + + public FederationStateStoreException(FederationStateStoreErrorCode code) { + super(); + this.code = code; + } + + public FederationStateStoreErrorCode getCode() { + return code; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java new file mode 100644 index 0000000..edf7837 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.exception; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception thrown by the {@code FederationMembershipStateStoreInputValidator}, + * {@code FederationApplicationHomeSubClusterStoreInputValidator}, + * {@code FederationPolicyStoreInputValidator} if the input is invalid. + * + */ +public class FederationStateStoreInvalidInputException extends YarnException { + + /** + * IDE auto-generated. + */ + private static final long serialVersionUID = -7352144682711430801L; + + public FederationStateStoreInvalidInputException(Throwable cause) { + super(cause); + } + + public FederationStateStoreInvalidInputException(String message) { + super(message); + } + + public FederationStateStoreInvalidInputException(String message, + Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java new file mode 100644 index 0000000..19d6750 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.exception; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception thrown by the {@code FederationStateStore}, if it is a retriable + * exception. + * + */ +public class FederationStateStoreRetriableException extends YarnException { + + private static final long serialVersionUID = 1L; + + public FederationStateStoreRetriableException(Throwable cause) { + super(cause); + } + + public FederationStateStoreRetriableException(String message) { + super(message); + } + + public FederationStateStoreRetriableException(String message, + Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java new file mode 100644 index 0000000..727606f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.exception; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 6e564dc..127bf82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; @@ -60,8 +61,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationH import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * In-memory implementation of {@link FederationStateStore}. @@ -74,6 +78,9 @@ public class MemoryFederationStateStore implements FederationStateStore { private final MonotonicClock clock = new MonotonicClock(); + public static final Logger LOG = + LoggerFactory.getLogger(MemoryFederationStateStore.class); + @Override public void init(Configuration conf) { membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>(); @@ -94,7 +101,17 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationMembershipStateStoreInputValidator .validateSubClusterRegisterRequest(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); - membership.put(subClusterInfo.getSubClusterId(), subClusterInfo); + + SubClusterInfo subClusterInfoToSave = + SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(), + subClusterInfo.getAMRMServiceAddress(), + subClusterInfo.getClientRMServiceAddress(), + subClusterInfo.getRMAdminServiceAddress(), + subClusterInfo.getRMWebServiceAddress(), clock.getTime(), + subClusterInfo.getState(), subClusterInfo.getLastStartTime(), + subClusterInfo.getCapability()); + + membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave); return SubClusterRegisterResponse.newInstance(); } @@ -105,8 +122,11 @@ public class MemoryFederationStateStore implements FederationStateStore { .validateSubClusterDeregisterRequest(request); SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); if (subClusterInfo == null) { - throw new YarnException( - "SubCluster " + request.getSubClusterId().toString() + " not found"); + String errMsg = + "SubCluster " + request.getSubClusterId().toString() + " not found"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL, + errMsg); } else { subClusterInfo.setState(request.getState()); } @@ -124,8 +144,11 @@ public class MemoryFederationStateStore implements FederationStateStore { SubClusterInfo subClusterInfo = membership.get(subClusterId); if (subClusterInfo == null) { - throw new YarnException("Subcluster " + subClusterId.toString() - + " does not exist; cannot heartbeat"); + String errMsg = "Subcluster " + subClusterId.toString() + + " does not exist; cannot heartbeat"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL, + errMsg); } subClusterInfo.setLastHeartBeat(clock.getTime()); @@ -143,8 +166,10 @@ public class MemoryFederationStateStore implements FederationStateStore { .validateGetSubClusterInfoRequest(request); SubClusterId subClusterId = request.getSubClusterId(); if (!membership.containsKey(subClusterId)) { - throw new YarnException( - "Subcluster " + subClusterId.toString() + " does not exist"); + String errMsg = + "Subcluster " + subClusterId.toString() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg); } return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId)); @@ -193,7 +218,9 @@ public class MemoryFederationStateStore implements FederationStateStore { ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); if (!applications.containsKey(appId)) { - throw new YarnException("Application " + appId + " does not exist"); + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg); } applications.put(appId, @@ -209,7 +236,10 @@ public class MemoryFederationStateStore implements FederationStateStore { .validateGetApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - throw new YarnException("Application " + appId + " does not exist"); + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, + errMsg); } return GetApplicationHomeSubClusterResponse.newInstance( @@ -238,7 +268,9 @@ public class MemoryFederationStateStore implements FederationStateStore { .validateDeleteApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - throw new YarnException("Application " + appId + " does not exist"); + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg); } applications.remove(appId); @@ -253,7 +285,9 @@ public class MemoryFederationStateStore implements FederationStateStore { .validateGetSubClusterPolicyConfigurationRequest(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { - throw new YarnException("Policy for queue " + queue + " does not exist"); + String errMsg = "Policy for queue " + queue + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg); } return GetSubClusterPolicyConfigurationResponse http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java index f13c8f1..cbf64e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java @@ -260,4 +260,66 @@ public abstract class SubClusterInfo { + ", getState() = " + getState() + ", getLastStartTime() = " + getLastStartTime() + ", getCapability() = " + getCapability() + "]"; } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + SubClusterInfo other = (SubClusterInfo) obj; + if (!this.getSubClusterId().equals(other.getSubClusterId())) { + return false; + } + if (!this.getAMRMServiceAddress().equals(other.getAMRMServiceAddress())) { + return false; + } + if (!this.getClientRMServiceAddress() + .equals(other.getClientRMServiceAddress())) { + return false; + } + if (!this.getRMAdminServiceAddress() + .equals(other.getRMAdminServiceAddress())) { + return false; + } + if (!this.getRMWebServiceAddress().equals(other.getRMWebServiceAddress())) { + return false; + } + if (!this.getState().equals(other.getState())) { + return false; + } + return this.getLastStartTime() == other.getLastStartTime(); + // Capability and HeartBeat fields are not included as they are temporal + // (i.e. timestamps), so they change during the lifetime of the same + // sub-cluster + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((getSubClusterId() == null) ? 0 : getSubClusterId().hashCode()); + result = prime * result + ((getAMRMServiceAddress() == null) ? 0 + : getAMRMServiceAddress().hashCode()); + result = prime * result + ((getClientRMServiceAddress() == null) ? 0 + : getClientRMServiceAddress().hashCode()); + result = prime * result + ((getRMAdminServiceAddress() == null) ? 0 + : getRMAdminServiceAddress().hashCode()); + result = prime * result + ((getRMWebServiceAddress() == null) ? 0 + : getRMWebServiceAddress().hashCode()); + result = + prime * result + ((getState() == null) ? 0 : getState().hashCode()); + result = prime * result + + (int) (getLastStartTime() ^ (getLastStartTime() >>> 32)); + return result; + // Capability and HeartBeat fields are not included as they are temporal + // (i.e. timestamps), so they change during the lifetime of the same + // sub-cluster + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java index b650b5f..cfdd038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java @@ -82,22 +82,6 @@ public class SubClusterInfoPBImpl extends SubClusterInfo { } @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override public String toString() { return TextFormat.shortDebugString(getProto()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java index c14a452..d920144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.store.utils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java index ff9d8e9..ebe622b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.federation.store.utils; import java.net.URI; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java index 273a8ac..0df2d85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.store.utils; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java deleted file mode 100644 index ea1428d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.federation.store.utils; - -import org.apache.hadoop.yarn.exceptions.YarnException; - -/** - * Exception thrown by the {@link FederationMembershipStateStoreInputValidator}, - * {@link FederationApplicationHomeSubClusterStoreInputValidator}, - * {@link FederationPolicyStoreInputValidator} if the input is invalid. - * - */ -public class FederationStateStoreInvalidInputException extends YarnException { - - /** - * IDE auto-generated. - */ - private static final long serialVersionUID = -7352144682711430801L; - - public FederationStateStoreInvalidInputException(Throwable cause) { - super(cause); - } - - public FederationStateStoreInvalidInputException(String message) { - super(message); - } - - public FederationStateStoreInvalidInputException(String message, - Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java new file mode 100644 index 0000000..7dbb20a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Common utility methods used by the store implementations. + * + */ +public final class FederationStateStoreUtils { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreUtils.class); + + private FederationStateStoreUtils() { + } + + /** + * Returns the SQL <code>FederationStateStore</code> connection to the pool. + * + * @param log the logger interface + * @param cstmt the interface used to execute SQL stored procedures + * @param conn the SQL connection + * @throws YarnException on failure + */ + public static void returnToPool(Logger log, CallableStatement cstmt, + Connection conn) throws YarnException { + if (cstmt != null) { + try { + cstmt.close(); + } catch (SQLException e) { + logAndThrowException(log, "Exception while trying to close Statement", + e); + } + } + + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + logAndThrowException(log, "Exception while trying to close Connection", + e); + } + } + } + + /** + * Throws an exception due to an error in <code>FederationStateStore</code>. + * + * @param log the logger interface + * @param errMsg the error message + * @param t the throwable raised in the called class. + * @throws YarnException on failure + */ + public static void logAndThrowException(Logger log, String errMsg, + Throwable t) throws YarnException { + if (t != null) { + log.error(errMsg, t); + throw new YarnException(errMsg, t); + } else { + log.error(errMsg); + throw new YarnException(errMsg); + } + } + + /** + * Throws an <code>FederationStateStoreException</code> due to an error in + * <code>FederationStateStore</code>. + * + * @param log the logger interface + * @param code FederationStateStoreErrorCode of the error + * @param errMsg the error message + * @throws YarnException on failure + */ + public static void logAndThrowStoreException(Logger log, + FederationStateStoreErrorCode code, String errMsg) throws YarnException { + log.error(errMsg + " " + code.toString()); + throw new FederationStateStoreException(code); + } + + /** + * Throws an <code>FederationStateStoreException</code> due to an error in + * <code>FederationStateStore</code>. + * + * @param log the logger interface + * @param code FederationStateStoreErrorCode of the error + * @throws YarnException on failure + */ + public static void logAndThrowStoreException(Logger log, + FederationStateStoreErrorCode code) throws YarnException { + log.error(code.toString()); + throw new FederationStateStoreException(code); + } + + /** + * Throws an <code>FederationStateStoreInvalidInputException</code> due to an + * error in <code>FederationStateStore</code>. + * + * @param log the logger interface + * @param errMsg the error message + * @throws YarnException on failure + */ + public static void logAndThrowInvalidInputException(Logger log, String errMsg) + throws YarnException { + LOG.error(errMsg); + throw new FederationStateStoreInvalidInputException(errMsg); + } + + /** + * Throws an <code>FederationStateStoreRetriableException</code> due to an + * error in <code>FederationStateStore</code>. + * + * @param log the logger interface + * @param errMsg the error message + * @param t the throwable raised in the called class. + * @throws YarnException on failure + */ + public static void logAndThrowRetriableException(Logger log, String errMsg, + Throwable t) throws YarnException { + if (t != null) { + LOG.error(errMsg, t); + throw new FederationStateStoreRetriableException(errMsg, t); + } else { + LOG.error(errMsg); + throw new FederationStateStoreRetriableException(errMsg); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index e8f245e..5693342 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -137,14 +138,32 @@ public final class FederationStateStoreFacade { initCache(); } + /** + * Create a RetryPolicy for {@code FederationStateStoreFacade}. In case of + * failure, it retries for: + * <ul> + * <li>{@code FederationStateStoreRetriableException}</li> + * <li>{@code CacheLoaderException}</li> + * </ul> + * + * @param conf the updated configuration + * @return the RetryPolicy for FederationStateStoreFacade + */ public static RetryPolicy createRetryPolicy(Configuration conf) { // Retry settings for StateStore - RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry( + RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry( conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE), conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS), TimeUnit.MILLISECONDS); - + Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = + new HashMap<Class<? extends Exception>, RetryPolicy>(); + exceptionToPolicyMap.put(FederationStateStoreRetriableException.class, + basePolicy); + exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy); + + RetryPolicy retryPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); return retryPolicy; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 63a5b65..80b00ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -67,9 +70,11 @@ public abstract class FederationStateStoreBaseTest { protected abstract FederationStateStore createStateStore(); + private Configuration conf; + @Before public void before() throws IOException, YarnException { - stateStore.init(new Configuration()); + stateStore.init(conf); } @After @@ -114,8 +119,10 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.deregisterSubCluster(deregisterRequest); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL, + e.getCode()); } } @@ -141,9 +148,10 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.getSubCluster(request).getSubClusterInfo(); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Subcluster SC does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, + e.getCode()); } } @@ -166,19 +174,25 @@ public abstract class FederationStateStoreBaseTest { stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance( subClusterId2, SubClusterState.SC_UNHEALTHY, "capability")); - Assert.assertTrue( - stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) - .getSubClusters().contains(subClusterInfo1)); - Assert.assertFalse( + List<SubClusterInfo> subClustersActive = stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) - .getSubClusters().contains(subClusterInfo2)); - - Assert.assertTrue( + .getSubClusters(); + List<SubClusterInfo> subClustersAll = stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) - .getSubClusters().contains(subClusterInfo1)); - Assert.assertTrue( - stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) - .getSubClusters().contains(subClusterInfo2)); + .getSubClusters(); + + // SC1 is the only active + Assert.assertEquals(1, subClustersActive.size()); + SubClusterInfo sc1 = subClustersActive.get(0); + Assert.assertEquals(subClusterId1, sc1.getSubClusterId()); + + // SC1 and SC2 are the SubCluster present into the StateStore + + Assert.assertEquals(2, subClustersAll.size()); + Assert.assertTrue(subClustersAll.contains(sc1)); + subClustersAll.remove(sc1); + SubClusterInfo sc2 = subClustersAll.get(0); + Assert.assertEquals(subClusterId2, sc2.getSubClusterId()); } @Test @@ -204,9 +218,10 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.subClusterHeartbeat(heartbeatRequest); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Subcluster SC does not exist; cannot heartbeat")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL, + e.getCode()); } } @@ -265,9 +280,10 @@ public abstract class FederationStateStoreBaseTest { try { queryApplicationHomeSC(appId); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, + e.getCode()); } } @@ -281,9 +297,9 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.deleteApplicationHomeSubCluster(delRequest); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId.toString() + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode()); } } @@ -314,9 +330,10 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.getApplicationHomeSubCluster(request); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId.toString() + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, + e.getCode()); } } @@ -379,9 +396,9 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.updateApplicationHomeSubCluster((updateRequest)); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId.toString() + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode()); } } @@ -440,9 +457,9 @@ public abstract class FederationStateStoreBaseTest { try { stateStore.getPolicyConfiguration(request); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Policy for queue Queue does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode()); } } @@ -537,4 +554,8 @@ public abstract class FederationStateStoreBaseTest { return result.getPolicyConfiguration(); } + protected void setConf(Configuration conf) { + this.conf = conf; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 74404c7..64adab8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.store.impl; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; /** @@ -27,6 +28,7 @@ public class TestMemoryFederationStateStore @Override protected FederationStateStore createStateStore() { + super.setConf(new Configuration()); return new MemoryFederationStateStore(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java index b95f17a..8ac5e81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.federation.store.utils; import java.nio.ByteBuffer; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a01b06e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java new file mode 100644 index 0000000..632e865 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import javax.cache.integration.CacheLoaderException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class to validate FederationStateStoreFacade retry policy. + */ +public class TestFederationStateStoreFacadeRetry { + + private int maxRetries = 4; + private Configuration conf; + + /* + * Test to validate that FederationStateStoreRetriableException is a retriable + * exception. + */ + @Test + public void testFacadeRetriableException() throws Exception { + conf = new Configuration(); + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); + RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); + RetryAction action = policy.shouldRetry( + new FederationStateStoreRetriableException(""), 0, 0, false); + // We compare only the action, since delay and the reason are random values + // during this test + Assert.assertEquals(RetryAction.RETRY.action, action.action); + + // After maxRetries we stop to retry + action = policy.shouldRetry(new FederationStateStoreRetriableException(""), + maxRetries, 0, false); + Assert.assertEquals(RetryAction.FAIL.action, action.action); + } + + /* + * Test to validate that YarnException is not a retriable exception. + */ + @Test + public void testFacadeYarnException() throws Exception { + + conf = new Configuration(); + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); + RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); + RetryAction action = policy.shouldRetry(new YarnException(), 0, 0, false); + Assert.assertEquals(RetryAction.FAIL.action, action.action); + } + + /* + * Test to validate that FederationStateStoreException is not a retriable + * exception. + */ + @Test + public void testFacadeStateStoreException() throws Exception { + conf = new Configuration(); + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); + RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); + RetryAction action = policy.shouldRetry( + new FederationStateStoreException( + FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL), + 0, 0, false); + Assert.assertEquals(RetryAction.FAIL.action, action.action); + } + + /* + * Test to validate that FederationStateStoreInvalidInputException is not a + * retriable exception. + */ + @Test + public void testFacadeInvalidInputException() throws Exception { + conf = new Configuration(); + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); + RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); + RetryAction action = policy.shouldRetry( + new FederationStateStoreInvalidInputException(""), 0, 0, false); + Assert.assertEquals(RetryAction.FAIL.action, action.action); + } + + /* + * Test to validate that CacheLoaderException is a retriable exception. + */ + @Test + public void testFacadeCacheRetriableException() throws Exception { + conf = new Configuration(); + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); + RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); + RetryAction action = + policy.shouldRetry(new CacheLoaderException(""), 0, 0, false); + // We compare only the action, since delay and the reason are random values + // during this test + Assert.assertEquals(RetryAction.RETRY.action, action.action); + + // After maxRetries we stop to retry + action = + policy.shouldRetry(new CacheLoaderException(""), maxRetries, 0, false); + Assert.assertEquals(RetryAction.FAIL.action, action.action); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org