YARN-5406. In-memory based implementation of the FederationMembershipStateStore. Contributed by Ellen Hui.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c679e11c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c679e11c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c679e11c Branch: refs/heads/YARN-2915 Commit: c679e11c3a51ba2489d9692f2aa15fb73a6a2c21 Parents: c718aff Author: Subru Krishnan <su...@apache.org> Authored: Thu Aug 4 15:54:38 2016 -0700 Committer: Carlo Curino <cur...@apache.org> Committed: Tue May 16 08:52:36 2017 -0700 ---------------------------------------------------------------------- .../store/impl/MemoryFederationStateStore.java | 138 ++++++++++++ .../federation/store/impl/package-info.java | 17 ++ .../records/GetSubClustersInfoRequest.java | 4 + .../store/records/SubClusterState.java | 4 + .../impl/FederationStateStoreBaseTest.java | 221 +++++++++++++++++++ .../impl/TestMemoryFederationStateStore.java | 49 ++++ 6 files changed, 433 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c679e11c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java new file mode 100644 index 0000000..7fdc4a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.util.MonotonicClock; + +import com.google.common.annotations.VisibleForTesting; + +/** + * In-memory implementation of FederationMembershipStateStore. + */ +public class MemoryFederationStateStore + implements FederationMembershipStateStore { + + private final Map<SubClusterId, SubClusterInfo> membership = + new ConcurrentHashMap<SubClusterId, SubClusterInfo>(); + private final MonotonicClock clock = new MonotonicClock(); + + @Override + public Version getMembershipStateStoreVersion() { + return null; + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest request) throws YarnException { + SubClusterInfo subClusterInfo = request.getSubClusterInfo(); + subClusterInfo.setLastStartTime(clock.getTime()); + membership.put(subClusterInfo.getSubClusterId(), subClusterInfo); + return SubClusterRegisterResponse.newInstance(); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest request) throws YarnException { + SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); + if (subClusterInfo == null) { + throw new YarnException( + "SubCluster " + request.getSubClusterId().toString() + " not found"); + } else { + subClusterInfo.setState(request.getState()); + } + + return SubClusterDeregisterResponse.newInstance(); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest request) throws YarnException { + + SubClusterId subClusterId = request.getSubClusterId(); + SubClusterInfo subClusterInfo = membership.get(subClusterId); + + if (subClusterInfo == null) { + throw new YarnException("Subcluster " + subClusterId.toString() + + " does not exist; cannot heartbeat"); + } + + subClusterInfo.setLastHeartBeat(clock.getTime()); + subClusterInfo.setState(request.getState()); + subClusterInfo.setCapability(request.getCapability()); + + return SubClusterHeartbeatResponse.newInstance(); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest request) throws YarnException { + SubClusterId subClusterId = request.getSubClusterId(); + if (!membership.containsKey(subClusterId)) { + throw new YarnException( + "Subcluster " + subClusterId.toString() + " does not exist"); + } + + return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId)); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest request) throws YarnException { + List<SubClusterInfo> result = new ArrayList<SubClusterInfo>(); + + for (SubClusterInfo info : membership.values()) { + if (!request.getFilterInactiveSubClusters() + || info.getState().isActive()) { + result.add(info); + } + } + return GetSubClustersInfoResponse.newInstance(result); + + } + + @VisibleForTesting + public Map<SubClusterId, SubClusterInfo> getMembershipTable() { + return membership; + } + + @VisibleForTesting + public void clearMembershipTable() { + membership.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c679e11c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/package-info.java new file mode 100644 index 0000000..56e1274 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.impl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c679e11c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoRequest.java index 3264d81..90d2f99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoRequest.java @@ -26,6 +26,10 @@ import org.apache.hadoop.yarn.util.Records; /** * Request class to obtain information about all sub-clusters that are * participating in federation. + * + * If filterInactiveSubClusters is set to true, only active sub-clusters will be + * returned; otherwise, all sub-clusters will be returned regardless of state. + * By default, filterInactiveSubClusters is true. */ @Private @Unstable http://git-wip-us.apache.org/repos/asf/hadoop/blob/c679e11c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java index 22cec99..ff49aaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java @@ -53,6 +53,10 @@ public enum SubClusterState { return (this != SC_RUNNING && this != SC_NEW); } + public boolean isActive() { + return this == SC_RUNNING; + } + public boolean isFinal() { return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED || this == SC_LOST); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c679e11c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java new file mode 100644 index 0000000..7eb1c86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.io.IOException; + +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.junit.Assert; +import org.junit.Test; + +/** + * Base class for FederationMembershipStateStore implementations. + */ +public abstract class FederationStateStoreBaseTest { + + static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreBaseTest.class); + private static final MonotonicClock CLOCK = new MonotonicClock(); + + private FederationMembershipStateStore stateStore = getStateStore(); + + @Before + public void before() throws IOException { + clearMembership(); + } + + @Test + public void testRegisterSubCluster() throws Exception { + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + + SubClusterRegisterResponse result = stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + Map<SubClusterId, SubClusterInfo> membership = getMembership(); + + Assert.assertNotNull(membership.get(subClusterId)); + Assert.assertNotNull(result); + Assert.assertEquals(subClusterInfo, membership.get(subClusterId)); + } + + @Test + public void testDeregisterSubCluster() throws Exception { + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + + SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest + .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); + + stateStore.deregisterSubCluster(deregisterRequest); + + Map<SubClusterId, SubClusterInfo> membership = getMembership(); + Assert.assertNotNull(membership.get(subClusterId)); + Assert.assertEquals(membership.get(subClusterId).getState(), + SubClusterState.SC_UNREGISTERED); + } + + @Test + public void testDeregisterSubClusterUnknownSubCluster() throws Exception { + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + + SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest + .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); + try { + stateStore.deregisterSubCluster(deregisterRequest); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); + } + } + + @Test + public void testGetSubClusterInfo() throws Exception { + + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + + GetSubClusterInfoRequest request = + GetSubClusterInfoRequest.newInstance(subClusterId); + Assert.assertEquals(subClusterInfo, + stateStore.getSubCluster(request).getSubClusterInfo()); + } + + @Test + public void testGetSubClusterInfoUnknownSubCluster() throws Exception { + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + GetSubClusterInfoRequest request = + GetSubClusterInfoRequest.newInstance(subClusterId); + + try { + stateStore.getSubCluster(request).getSubClusterInfo(); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Subcluster SC does not exist")); + } + } + + @Test + public void testGetAllSubClustersInfo() throws Exception { + + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1); + + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2); + + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo1)); + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo2)); + + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest + .newInstance(subClusterId1, SubClusterState.SC_RUNNING, "")); + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest + .newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, "")); + + Assert.assertTrue( + stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) + .getSubClusters().contains(subClusterInfo1)); + Assert.assertFalse( + stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) + .getSubClusters().contains(subClusterInfo2)); + + Assert.assertTrue( + stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) + .getSubClusters().contains(subClusterInfo1)); + Assert.assertTrue( + stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) + .getSubClusters().contains(subClusterInfo2)); + } + + @Test + public void testSubClusterHeartbeat() throws Exception { + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + + SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest + .newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability"); + stateStore.subClusterHeartbeat(heartbeatRequest); + + Map<SubClusterId, SubClusterInfo> membership = getMembership(); + Assert.assertEquals(membership.get(subClusterId).getState(), + SubClusterState.SC_RUNNING); + Assert.assertNotNull(membership.get(subClusterId).getLastHeartBeat()); + } + + @Test + public void testSubClusterHeartbeatUnknownSubCluster() throws Exception { + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest + .newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability"); + + try { + stateStore.subClusterHeartbeat(heartbeatRequest); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .startsWith("Subcluster SC does not exist; cannot heartbeat")); + } + } + + private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { + + String amRMAddress = "1.2.3.4:1"; + String clientRMAddress = "1.2.3.4:2"; + String rmAdminAddress = "1.2.3.4:3"; + String webAppAddress = "1.2.3.4:4"; + + return SubClusterInfo.newInstance(subClusterId, amRMAddress, + clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, + CLOCK.getTime(), "cabability"); + } + + protected abstract Map<SubClusterId, SubClusterInfo> getMembership(); + + protected abstract void clearMembership(); + + protected abstract FederationMembershipStateStore getStateStore(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c679e11c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java new file mode 100644 index 0000000..b74ffbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * Unit tests for MemoryFederationStateStore. + */ +public class TestMemoryFederationStateStore + extends FederationStateStoreBaseTest { + + private static final MemoryFederationStateStore STATESTORE = + new MemoryFederationStateStore(); + + @Override + protected Map<SubClusterId, SubClusterInfo> getMembership() { + return STATESTORE.getMembershipTable(); + } + + @Override + protected void clearMembership() { + STATESTORE.clearMembershipTable(); + } + + @Override + protected FederationMembershipStateStore getStateStore() { + return STATESTORE; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org