This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit de572a6192f4186cc1cabe8ee8ceb38a7d3dc615 Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Wed Feb 16 12:55:56 2022 -0500 Add rest endpoint for virtual topology group (#1958) --- .../rest/server/resources/AbstractResource.java | 1 + .../server/resources/helix/ClusterAccessor.java | 25 ++++ .../helix/rest/server/TestClusterAccessor.java | 156 ++++++++++++++++----- 3 files changed, 147 insertions(+), 35 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index a709d6a..b7d02a7 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -59,6 +59,7 @@ public class AbstractResource { public enum Command { activate, addInstanceTag, + addVirtualTopologyGroup, expand, enable, disable, diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java index cf2457c..daea45f 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java @@ -70,6 +70,7 @@ import org.apache.helix.rest.common.HttpConstants; import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.rest.server.service.ClusterService; import org.apache.helix.rest.server.service.ClusterServiceImpl; +import org.apache.helix.rest.server.service.VirtualTopologyGroupService; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -235,6 +236,21 @@ public class ClusterAccessor extends AbstractHelixResource { } break; + case addVirtualTopologyGroup: + try { + addVirtualTopologyGroup(clusterId, content); + } catch (JsonProcessingException ex) { + LOG.error("Failed to parse json string: {}", content, ex); + return badRequest("Invalid payload json body: " + content); + } catch (IllegalArgumentException ex) { + LOG.error("Illegal input {} for command {}.", content, command, ex); + return badRequest(String.format("Illegal input %s for command %s", content, command)); + } catch (Exception ex) { + LOG.error("Failed to add virtual topology group to cluster {}", clusterId, ex); + return serverError(ex); + } + break; + case expand: try { clusterSetup.expandCluster(clusterId); @@ -305,6 +321,15 @@ public class ClusterAccessor extends AbstractHelixResource { return OK(); } + private void addVirtualTopologyGroup(String clusterId, String content) throws JsonProcessingException { + ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); + VirtualTopologyGroupService service = new VirtualTopologyGroupService( + getHelixAdmin(), clusterService, getConfigAccessor(), getDataAccssor(clusterId)); + Map<String, String> customFieldsMap = + OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() { }); + service.addVirtualTopologyGroup(clusterId, customFieldsMap); + } + @ResponseMetered(name = HttpConstants.READ_REQUEST) @Timed(name = HttpConstants.READ_REQUEST) @GET diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index 3bb3c29..34cfbaf 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -19,6 +19,7 @@ package org.apache.helix.rest.server; * under the License. */ +import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -45,6 +46,7 @@ import org.apache.helix.api.status.ClusterManagementMode; import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.cloud.azure.AzureConstants; import org.apache.helix.cloud.constants.CloudProvider; +import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -70,10 +72,13 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TestClusterAccessor extends AbstractTestClass { + private static final String VG_CLUSTER = "vgCluster"; + @BeforeClass public void beforeClass() { for (String cluster : _clusters) { @@ -167,10 +172,7 @@ public class TestClusterAccessor extends AbstractTestClass { updateClusterConfigFromRest(cluster, configDelta, Command.update); //get valid cluster topology map - String topologyMapDef = get(topologyMapUrlBase, null, Response.Status.OK.getStatusCode(), true); - Map<String, Object> topologyMap = - OBJECT_MAPPER.readValue(topologyMapDef, new TypeReference<HashMap<String, Object>>() { - }); + Map<String, Object> topologyMap = getMapResponseFromRest(topologyMapUrlBase); Assert.assertEquals(topologyMap.size(), 2); Assert.assertTrue(topologyMap.get("/helixZoneId:zone0") instanceof List); List<String> instances = (List<String>) topologyMap.get("/helixZoneId:zone0"); @@ -197,10 +199,7 @@ public class TestClusterAccessor extends AbstractTestClass { updateClusterConfigFromRest(cluster, configDelta, Command.update); //get valid cluster fault zone map - String faultZoneMapDef = get(faultZoneUrlBase, null, Response.Status.OK.getStatusCode(), true); - Map<String, Object> faultZoneMap = - OBJECT_MAPPER.readValue(faultZoneMapDef, new TypeReference<HashMap<String, Object>>() { - }); + Map<String, Object> faultZoneMap = getMapResponseFromRest(faultZoneUrlBase); Assert.assertEquals(faultZoneMap.size(), 2); Assert.assertTrue(faultZoneMap.get("/helixZoneId:zone0") instanceof List); instances = (List<String>) faultZoneMap.get("/helixZoneId:zone0"); @@ -223,6 +222,108 @@ public class TestClusterAccessor extends AbstractTestClass { "/instance:TestCluster_1localhost_12927")))); } + @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testGetClusters") + public void testAddVirtualTopologyGroup(String requestParam, int numGroups, + Map<String, String> instanceToGroup) throws IOException { + post("clusters/" + VG_CLUSTER, + ImmutableMap.of("command", "addVirtualTopologyGroup"), + Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE), + Response.Status.OK.getStatusCode()); + Map<String, Object> topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER)); + Assert.assertTrue(topology.containsKey("zones")); + Assert.assertEquals(((List) topology.get("zones")).size(), numGroups); + + ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER); + String expectedTopology = "/" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname"; + Assert.assertEquals(clusterConfig.getTopology(), expectedTopology); + Assert.assertEquals(clusterConfig.getFaultZoneType(), VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE); + + HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor); + for (Map.Entry<String, String> entry : instanceToGroup.entrySet()) { + InstanceConfig instanceConfig = + helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey())); + String expectedGroup = entry.getValue(); + Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE), + expectedGroup); + } + } + + @Test(dependsOnMethods = "testGetClusters") + public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingException { + setupClusterForVirtualTopology(VG_CLUSTER); + String requestParam = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"," + + "\"autoMaintenanceModeDisabled\":\"true\"}"; + // expect failure as cluster is not in maintenance mode while autoMaintenanceModeDisabled=true + post("clusters/" + VG_CLUSTER, + ImmutableMap.of("command", "addVirtualTopologyGroup"), + Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE), + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + // enable maintenance mode and expect success + post("clusters/" + VG_CLUSTER, + ImmutableMap.of("command", "enableMaintenanceMode"), + Entity.entity("virtual group", MediaType.APPLICATION_JSON_TYPE), + Response.Status.OK.getStatusCode()); + post("clusters/" + VG_CLUSTER, + ImmutableMap.of("command", "addVirtualTopologyGroup"), + Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE), + Response.Status.OK.getStatusCode()); + + Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER)); + } + + private boolean isMaintenanceModeEnabled(String clusterName) throws JsonProcessingException { + String body = + get("clusters/" + clusterName + "/maintenance", null, Response.Status.OK.getStatusCode(), true); + return OBJECT_MAPPER.readTree(body).get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue(); + } + + @DataProvider + public Object[][] prepareVirtualTopologyTests() { + setupClusterForVirtualTopology(VG_CLUSTER); + String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}"; + String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}"; + return new Object[][] { + {test1, 7, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_4", + "vgCluster_localhost_12927", "vgTest_6")}, + {test2, 9, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_6", + "vgCluster_localhost_12927", "vgTest_8")}, + // repeat test1 for deterministic and test for decreasing numGroups + {test1, 7, ImmutableMap.of( + "vgCluster_localhost_12918", "vgTest_0", + "vgCluster_localhost_12919", "vgTest_0", + "vgCluster_localhost_12925", "vgTest_4", + "vgCluster_localhost_12927", "vgTest_6")} + }; + } + + private void setupClusterForVirtualTopology(String clusterName) { + HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + ZNRecord record = new ZNRecord("testZnode"); + record.setBooleanField(CloudConfig.CloudConfigProperty.CLOUD_ENABLED.name(), true); + record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_ID.name(), "TestCloudID"); + record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_PROVIDER.name(), CloudProvider.AZURE.name()); + CloudConfig cloudConfig = new CloudConfig.Builder(record).build(); + _gSetupTool.addCluster(clusterName, true, cloudConfig); + + Set<String> instances = new HashSet<>(); + for (int i = 0; i < 10; i++) { + String instanceName = clusterName + "_localhost_" + (12918 + i); + _gSetupTool.addInstanceToCluster(clusterName, instanceName); + InstanceConfig instanceConfig = + helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName)); + instanceConfig.setDomain("faultDomain=" + i / 2 + ",hostname=" + instanceName); + helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName), instanceConfig); + instances.add(instanceName); + } + startInstances(clusterName, instances, 10); + } + @Test(dependsOnMethods = "testGetClusterTopologyAndFaultZoneMap") public void testAddConfigFields() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); @@ -399,19 +500,11 @@ public class TestClusterAccessor extends AbstractTestClass { Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode()); // verify is in maintenance mode - String body = - get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true); - JsonNode node = OBJECT_MAPPER.readTree(body); - boolean maintenance = - node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue(); - Assert.assertTrue(maintenance); + Assert.assertTrue(isMaintenanceModeEnabled(cluster)); // Check that we could retrieve maintenance signal correctly - String signal = get("clusters/" + cluster + "/controller/maintenanceSignal", null, - Response.Status.OK.getStatusCode(), true); Map<String, Object> maintenanceSignalMap = - OBJECT_MAPPER.readValue(signal, new TypeReference<HashMap<String, Object>>() { - }); + getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceSignal"); Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER"); Assert.assertEquals(maintenanceSignalMap.get("REASON"), reason); Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP")); @@ -422,10 +515,7 @@ public class TestClusterAccessor extends AbstractTestClass { Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode()); // verify no longer in maintenance mode - body = get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true); - node = OBJECT_MAPPER.readTree(body); - Assert.assertFalse( - node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue()); + Assert.assertFalse(isMaintenanceModeEnabled(cluster)); get("clusters/" + cluster + "/controller/maintenanceSignal", null, Response.Status.NOT_FOUND.getStatusCode(), false); @@ -448,11 +538,8 @@ public class TestClusterAccessor extends AbstractTestClass { Assert.assertNotNull(leader, "Leader name cannot be null!"); // Get the controller leadership history JSON's last entry - String leadershipHistory = get("clusters/" + cluster + "/controller/history", null, - Response.Status.OK.getStatusCode(), true); - Map<String, Object> leadershipHistoryMap = - OBJECT_MAPPER.readValue(leadershipHistory, new TypeReference<HashMap<String, Object>>() { - }); + Map<String, Object> leadershipHistoryMap = getMapResponseFromRest("clusters/" + cluster + "/controller/history"); + Assert.assertNotNull(leadershipHistoryMap, "Leadership history cannot be null!"); Object leadershipHistoryList = leadershipHistoryMap.get(AbstractResource.Properties.history.name()); @@ -477,11 +564,8 @@ public class TestClusterAccessor extends AbstractTestClass { Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode()); // Get the maintenance history JSON's last entry - String maintenanceHistory = get("clusters/" + cluster + "/controller/maintenanceHistory", null, - Response.Status.OK.getStatusCode(), true); Map<String, Object> maintenanceHistoryMap = - OBJECT_MAPPER.readValue(maintenanceHistory, new TypeReference<HashMap<String, Object>>() { - }); + getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceHistory"); Object maintenanceHistoryList = maintenanceHistoryMap.get(ClusterAccessor.ClusterProperties.maintenanceHistory.name()); Assert.assertNotNull(maintenanceHistoryList); @@ -571,10 +655,7 @@ public class TestClusterAccessor extends AbstractTestClass { System.out.println("Start test :" + TestHelper.getTestMethodName()); String cluster = "TestCluster_1"; String urlBase = "clusters/TestCluster_1/statemodeldefs/"; - String stateModelDefs = - get(urlBase, null, Response.Status.OK.getStatusCode(), true); - Map<String, Object> defMap = OBJECT_MAPPER.readValue(stateModelDefs, new TypeReference<HashMap<String, Object>>() { - }); + Map<String, Object> defMap = getMapResponseFromRest(urlBase); Assert.assertTrue(defMap.size() == 2); Assert.assertTrue(defMap.get("stateModelDefinitions") instanceof List); @@ -1427,4 +1508,9 @@ public class TestClusterAccessor extends AbstractTestClass { Assert.assertEquals(auditLog.getResponseCode(), statusCode); Assert.assertEquals(auditLog.getResponseEntity(), responseEntity); } + + private Map<String, Object> getMapResponseFromRest(String uri) throws JsonProcessingException { + String response = get(uri, null, Response.Status.OK.getStatusCode(), true); + return OBJECT_MAPPER.readValue(response, new TypeReference<HashMap<String, Object>>() { }); + } }
