This is an automated email from the ASF dual-hosted git repository.

zhangmeng pushed a commit to branch helix-virtual-group
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-virtual-group by this 
push:
     new 35ca4cd  Add rest endpoint for virtual topology group (#1958)
35ca4cd is described below

commit 35ca4cd97574ce48fd8ad52801dd8d64065ee791
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Wed Feb 16 12:55:56 2022 -0500

    Add rest endpoint for virtual topology group (#1958)
---
 .../rest/server/resources/AbstractResource.java    |   1 +
 .../server/resources/helix/ClusterAccessor.java    |  25 ++++
 .../helix/rest/server/TestClusterAccessor.java     | 156 ++++++++++++++++-----
 3 files changed, 147 insertions(+), 35 deletions(-)

diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index a709d6a..b7d02a7 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -59,6 +59,7 @@ public class AbstractResource {
   public enum Command {
     activate,
     addInstanceTag,
+    addVirtualTopologyGroup,
     expand,
     enable,
     disable,
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index cf2457c..daea45f 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -70,6 +70,7 @@ import org.apache.helix.rest.common.HttpConstants;
 import org.apache.helix.rest.server.json.cluster.ClusterTopology;
 import org.apache.helix.rest.server.service.ClusterService;
 import org.apache.helix.rest.server.service.ClusterServiceImpl;
+import org.apache.helix.rest.server.service.VirtualTopologyGroupService;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -235,6 +236,21 @@ public class ClusterAccessor extends AbstractHelixResource 
{
         }
         break;
 
+      case addVirtualTopologyGroup:
+        try {
+          addVirtualTopologyGroup(clusterId, content);
+        } catch (JsonProcessingException ex) {
+          LOG.error("Failed to parse json string: {}", content, ex);
+          return badRequest("Invalid payload json body: " + content);
+        } catch (IllegalArgumentException ex) {
+          LOG.error("Illegal input {} for command {}.", content, command, ex);
+          return badRequest(String.format("Illegal input %s for command %s", 
content, command));
+        } catch (Exception ex) {
+          LOG.error("Failed to add virtual topology group to cluster {}", 
clusterId, ex);
+          return serverError(ex);
+        }
+        break;
+
       case expand:
         try {
           clusterSetup.expandCluster(clusterId);
@@ -305,6 +321,15 @@ public class ClusterAccessor extends AbstractHelixResource 
{
     return OK();
   }
 
+  private void addVirtualTopologyGroup(String clusterId, String content) 
throws JsonProcessingException {
+    ClusterService clusterService = new 
ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+    VirtualTopologyGroupService service = new VirtualTopologyGroupService(
+        getHelixAdmin(), clusterService, getConfigAccessor(), 
getDataAccssor(clusterId));
+    Map<String, String> customFieldsMap =
+        OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, 
String>>() { });
+    service.addVirtualTopologyGroup(clusterId, customFieldsMap);
+  }
+
   @ResponseMetered(name = HttpConstants.READ_REQUEST)
   @Timed(name = HttpConstants.READ_REQUEST)
   @GET
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 3bb3c29..34cfbaf 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,7 @@ import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.cloud.azure.AzureConstants;
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -70,10 +72,13 @@ import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestClusterAccessor extends AbstractTestClass {
 
+  private static final String VG_CLUSTER = "vgCluster";
+
   @BeforeClass
   public void beforeClass() {
     for (String cluster : _clusters) {
@@ -167,10 +172,7 @@ public class TestClusterAccessor extends AbstractTestClass 
{
     updateClusterConfigFromRest(cluster, configDelta, Command.update);
 
     //get valid cluster topology map
-    String topologyMapDef = get(topologyMapUrlBase, null, 
Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> topologyMap =
-        OBJECT_MAPPER.readValue(topologyMapDef, new 
TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> topologyMap = 
getMapResponseFromRest(topologyMapUrlBase);
     Assert.assertEquals(topologyMap.size(), 2);
     Assert.assertTrue(topologyMap.get("/helixZoneId:zone0") instanceof List);
     List<String> instances = (List<String>) 
topologyMap.get("/helixZoneId:zone0");
@@ -197,10 +199,7 @@ public class TestClusterAccessor extends AbstractTestClass 
{
     updateClusterConfigFromRest(cluster, configDelta, Command.update);
 
     //get valid cluster fault zone map
-    String faultZoneMapDef = get(faultZoneUrlBase, null, 
Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> faultZoneMap =
-        OBJECT_MAPPER.readValue(faultZoneMapDef, new 
TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> faultZoneMap = 
getMapResponseFromRest(faultZoneUrlBase);
     Assert.assertEquals(faultZoneMap.size(), 2);
     Assert.assertTrue(faultZoneMap.get("/helixZoneId:zone0") instanceof List);
     instances = (List<String>) faultZoneMap.get("/helixZoneId:zone0");
@@ -223,6 +222,108 @@ public class TestClusterAccessor extends 
AbstractTestClass {
             "/instance:TestCluster_1localhost_12927"))));
   }
 
+  @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = 
"testGetClusters")
+  public void testAddVirtualTopologyGroup(String requestParam, int numGroups,
+      Map<String, String> instanceToGroup) throws IOException {
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+    Map<String, Object> topology = 
getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER));
+    Assert.assertTrue(topology.containsKey("zones"));
+    Assert.assertEquals(((List) topology.get("zones")).size(), numGroups);
+
+    ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER);
+    String expectedTopology = "/" + 
VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname";
+    Assert.assertEquals(clusterConfig.getTopology(), expectedTopology);
+    Assert.assertEquals(clusterConfig.getFaultZoneType(), 
VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE);
+
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, 
_baseAccessor);
+    for (Map.Entry<String, String> entry : instanceToGroup.entrySet()) {
+      InstanceConfig instanceConfig =
+          
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey()));
+      String expectedGroup = entry.getValue();
+      
Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE),
+          expectedGroup);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetClusters")
+  public void testVirtualTopologyGroupMaintenanceMode() throws 
JsonProcessingException {
+    setupClusterForVirtualTopology(VG_CLUSTER);
+    String requestParam = 
"{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\","
+        + "\"autoMaintenanceModeDisabled\":\"true\"}";
+    // expect failure as cluster is not in maintenance mode while 
autoMaintenanceModeDisabled=true
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+    // enable maintenance mode and expect success
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "enableMaintenanceMode"),
+        Entity.entity("virtual group", MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER));
+  }
+
+  private boolean isMaintenanceModeEnabled(String clusterName) throws 
JsonProcessingException {
+    String body =
+        get("clusters/" + clusterName + "/maintenance", null, 
Response.Status.OK.getStatusCode(), true);
+    return 
OBJECT_MAPPER.readTree(body).get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
+  }
+
+  @DataProvider
+  public Object[][] prepareVirtualTopologyTests() {
+    setupClusterForVirtualTopology(VG_CLUSTER);
+    String test1 = 
"{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}";
+    String test2 = 
"{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}";
+    return new Object[][] {
+        {test1, 7, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_4",
+            "vgCluster_localhost_12927", "vgTest_6")},
+        {test2, 9, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_6",
+            "vgCluster_localhost_12927", "vgTest_8")},
+        // repeat test1 for deterministic and test for decreasing numGroups
+        {test1, 7, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_4",
+            "vgCluster_localhost_12927", "vgTest_6")}
+    };
+  }
+
+  private void setupClusterForVirtualTopology(String clusterName) {
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, 
_baseAccessor);
+    ZNRecord record = new ZNRecord("testZnode");
+    
record.setBooleanField(CloudConfig.CloudConfigProperty.CLOUD_ENABLED.name(), 
true);
+    record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_ID.name(), 
"TestCloudID");
+    
record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_PROVIDER.name(), 
CloudProvider.AZURE.name());
+    CloudConfig cloudConfig = new CloudConfig.Builder(record).build();
+    _gSetupTool.addCluster(clusterName, true, cloudConfig);
+
+    Set<String> instances = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      String instanceName = clusterName + "_localhost_" + (12918 + i);
+      _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+      InstanceConfig instanceConfig =
+          
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+      instanceConfig.setDomain("faultDomain=" + i / 2 + ",hostname=" + 
instanceName);
+      
helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName),
 instanceConfig);
+      instances.add(instanceName);
+    }
+    startInstances(clusterName, instances, 10);
+  }
+
   @Test(dependsOnMethods = "testGetClusterTopologyAndFaultZoneMap")
   public void testAddConfigFields() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
@@ -399,19 +500,11 @@ public class TestClusterAccessor extends 
AbstractTestClass {
         Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), 
Response.Status.OK.getStatusCode());
 
     // verify is in maintenance mode
-    String body =
-        get("clusters/" + cluster + "/maintenance", null, 
Response.Status.OK.getStatusCode(), true);
-    JsonNode node = OBJECT_MAPPER.readTree(body);
-    boolean maintenance =
-        
node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
-    Assert.assertTrue(maintenance);
+    Assert.assertTrue(isMaintenanceModeEnabled(cluster));
 
     // Check that we could retrieve maintenance signal correctly
-    String signal = get("clusters/" + cluster + 
"/controller/maintenanceSignal", null,
-        Response.Status.OK.getStatusCode(), true);
     Map<String, Object> maintenanceSignalMap =
-        OBJECT_MAPPER.readValue(signal, new TypeReference<HashMap<String, 
Object>>() {
-        });
+        getMapResponseFromRest("clusters/" + cluster + 
"/controller/maintenanceSignal");
     Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER");
     Assert.assertEquals(maintenanceSignalMap.get("REASON"), reason);
     Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP"));
@@ -422,10 +515,7 @@ public class TestClusterAccessor extends AbstractTestClass 
{
         Entity.entity("", MediaType.APPLICATION_JSON_TYPE), 
Response.Status.OK.getStatusCode());
 
     // verify no longer in maintenance mode
-    body = get("clusters/" + cluster + "/maintenance", null, 
Response.Status.OK.getStatusCode(), true);
-    node = OBJECT_MAPPER.readTree(body);
-    Assert.assertFalse(
-        
node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue());
+    Assert.assertFalse(isMaintenanceModeEnabled(cluster));
 
     get("clusters/" + cluster + "/controller/maintenanceSignal", null,
         Response.Status.NOT_FOUND.getStatusCode(), false);
@@ -448,11 +538,8 @@ public class TestClusterAccessor extends AbstractTestClass 
{
     Assert.assertNotNull(leader, "Leader name cannot be null!");
 
     // Get the controller leadership history JSON's last entry
-    String leadershipHistory = get("clusters/" + cluster + 
"/controller/history", null,
-        Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> leadershipHistoryMap =
-        OBJECT_MAPPER.readValue(leadershipHistory, new 
TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> leadershipHistoryMap = 
getMapResponseFromRest("clusters/" + cluster + "/controller/history");
+
     Assert.assertNotNull(leadershipHistoryMap, "Leadership history cannot be 
null!");
     Object leadershipHistoryList =
         leadershipHistoryMap.get(AbstractResource.Properties.history.name());
@@ -477,11 +564,8 @@ public class TestClusterAccessor extends AbstractTestClass 
{
         Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), 
Response.Status.OK.getStatusCode());
 
     // Get the maintenance history JSON's last entry
-    String maintenanceHistory = get("clusters/" + cluster + 
"/controller/maintenanceHistory", null,
-        Response.Status.OK.getStatusCode(), true);
     Map<String, Object> maintenanceHistoryMap =
-        OBJECT_MAPPER.readValue(maintenanceHistory, new 
TypeReference<HashMap<String, Object>>() {
-        });
+        getMapResponseFromRest("clusters/" + cluster + 
"/controller/maintenanceHistory");
     Object maintenanceHistoryList =
         
maintenanceHistoryMap.get(ClusterAccessor.ClusterProperties.maintenanceHistory.name());
     Assert.assertNotNull(maintenanceHistoryList);
@@ -571,10 +655,7 @@ public class TestClusterAccessor extends AbstractTestClass 
{
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String cluster = "TestCluster_1";
     String urlBase = "clusters/TestCluster_1/statemodeldefs/";
-    String stateModelDefs =
-        get(urlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> defMap = OBJECT_MAPPER.readValue(stateModelDefs, new 
TypeReference<HashMap<String, Object>>() {
-    });
+    Map<String, Object> defMap = getMapResponseFromRest(urlBase);
 
     Assert.assertTrue(defMap.size() == 2);
     Assert.assertTrue(defMap.get("stateModelDefinitions") instanceof List);
@@ -1427,4 +1508,9 @@ public class TestClusterAccessor extends 
AbstractTestClass {
     Assert.assertEquals(auditLog.getResponseCode(), statusCode);
     Assert.assertEquals(auditLog.getResponseEntity(), responseEntity);
   }
+
+  private Map<String, Object> getMapResponseFromRest(String uri) throws 
JsonProcessingException {
+    String response = get(uri, null, Response.Status.OK.getStatusCode(), true);
+    return OBJECT_MAPPER.readValue(response, new TypeReference<HashMap<String, 
Object>>() { });
+  }
 }

Reply via email to