This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 33a28e761 Support Stoppable Check for Non-Topology-Aware Clusters
(#2961)
33a28e761 is described below
commit 33a28e761b9dd8d1ac790121a0dcae55f30e07f4
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Wed Dec 11 14:31:23 2024 -0800
Support Stoppable Check for Non-Topology-Aware Clusters (#2961)
Support Stoppable Check for Non-Topology-Aware Clusters
---
.../helix/rest/client/CustomRestClientImpl.java | 11 +-
.../StoppableInstancesSelector.java | 59 +++++++--
.../server/resources/helix/InstancesAccessor.java | 55 +++++++--
.../helix/rest/server/service/ClusterService.java | 7 ++
.../rest/server/service/ClusterServiceImpl.java | 9 ++
.../helix/rest/client/TestCustomRestClient.java | 39 ++++++
.../helix/rest/server/AbstractTestClass.java | 62 ++++++++++
.../helix/rest/server/TestInstancesAccessor.java | 132 +++++++++++++++++++++
.../rest/server/service/TestClusterService.java | 24 ++++
9 files changed, 376 insertions(+), 22 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index db943b3dd..b09116ac9 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -132,8 +132,15 @@ class CustomRestClientImpl implements CustomRestClient {
if (instances != null && !instances.isEmpty()) {
payLoads.put("instances", instances);
}
- if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
- payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
+ // Before sending the request, make sure the toBeStoppedInstances has no
overlap with instances
+ Set<String> remainingToBeStoppedInstances = toBeStoppedInstances;
+ if (instances != null && toBeStoppedInstances != null) {
+ remainingToBeStoppedInstances =
+ toBeStoppedInstances.stream().filter(ins -> !instances.contains(ins))
+ .collect(Collectors.toSet());
+ }
+ if (remainingToBeStoppedInstances != null &&
!remainingToBeStoppedInstances.isEmpty()) {
+ payLoads.put("to_be_stopped_instances", remainingToBeStoppedInstances);
}
if (clusterId != null) {
payLoads.put("cluster_id", clusterId);
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index bb5a2bc5c..891699100 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -86,8 +86,7 @@ public class StoppableInstancesSelector {
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
- Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
- collectEvacuatingInstances(toBeStoppedInstancesSet);
+ Set<String> toBeStoppedInstancesSet =
findToBeStoppedInstances(toBeStoppedInstances);
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
@@ -118,8 +117,7 @@ public class StoppableInstancesSelector {
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
- Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
- collectEvacuatingInstances(toBeStoppedInstancesSet);
+ Set<String> toBeStoppedInstancesSet =
findToBeStoppedInstances(toBeStoppedInstances);
Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
for (String zone : _orderOfZone) {
@@ -136,6 +134,39 @@ public class StoppableInstancesSelector {
return result;
}
+ /**
+ * Evaluates and collects stoppable instances not based on the zone order.
+ * The method iterates through instances, performing stoppable checks, and
records reasons for
+ * non-stoppability.
+ *
+ * @param instances A list of instance to be evaluated.
+ * @param toBeStoppedInstances A list of instances presumed to be already
stopped
+ * @return An ObjectNode containing:
+ * - 'stoppableNode': List of instances that can be stopped.
+ * - 'instance_not_stoppable_with_reasons': A map with the instance
name as the key and
+ * a list of reasons for non-stoppability as the value.
+ * @throws IOException
+ */
+ public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
+ List<String> toBeStoppedInstances) throws IOException {
+ ObjectNode result = JsonNodeFactory.instance.objectNode();
+ ArrayNode stoppableInstances =
+
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ ObjectNode failedStoppableInstances = result.putObject(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Set<String> toBeStoppedInstancesSet =
findToBeStoppedInstances(toBeStoppedInstances);
+
+ // Because zone order calculation is omitted, we must verify each
instance's existence
+ // to ensure we only process valid instances before performing stoppable
check.
+ Set<String> nonExistingInstances = processNonexistentInstances(instances,
failedStoppableInstances);
+ List<String> instancesToCheck = new ArrayList<>(instances);
+ instancesToCheck.removeAll(nonExistingInstances);
+ populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet,
stoppableInstances,
+ failedStoppableInstances);
+
+ return result;
+ }
+
private void populateStoppableInstances(List<String> instances, Set<String>
toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances)
throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
@@ -159,7 +190,7 @@ public class StoppableInstancesSelector {
}
}
- private void processNonexistentInstances(List<String> instances, ObjectNode
failedStoppableInstances) {
+ private Set<String> processNonexistentInstances(List<String> instances,
ObjectNode failedStoppableInstances) {
// Adding following logic to check whether instances exist or not. An
instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
@@ -174,6 +205,7 @@ public class StoppableInstancesSelector {
ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(nonSelectedInstance);
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
}
+ return nonSelectedInstances;
}
/**
@@ -258,21 +290,26 @@ public class StoppableInstancesSelector {
}
/**
- * Collect instances marked for evacuation in the current topology and add
them into the given set
+ * Collect instances within the cluster where the instance operation is set
to EVACUATE, SWAP_IN, or UNKNOWN.
+ * And return them as a set.
*
- * @param toBeStoppedInstances A set of instances we presume to be stopped.
+ * @param toBeStoppedInstances A list of instances we presume to be stopped.
*/
- private void collectEvacuatingInstances(Set<String> toBeStoppedInstances) {
+ private Set<String> findToBeStoppedInstances(List<String>
toBeStoppedInstances) {
+ Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
Set<String> allInstances = _clusterTopology.getAllInstances();
for (String instance : allInstances) {
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
- if (InstanceConstants.InstanceOperation.EVACUATE.equals(
- instanceConfig.getInstanceOperation().getOperation())) {
- toBeStoppedInstances.add(instance);
+ InstanceConstants.InstanceOperation operation =
instanceConfig.getInstanceOperation().getOperation();
+ if (operation == InstanceConstants.InstanceOperation.EVACUATE
+ || operation == InstanceConstants.InstanceOperation.SWAP_IN
+ || operation == InstanceConstants.InstanceOperation.UNKNOWN) {
+ toBeStoppedInstancesSet.add(instance);
}
}
+ return toBeStoppedInstancesSet;
}
public static class StoppableInstancesSelectorBuilder {
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index d24ad9fce..52313a66b 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -82,7 +82,7 @@ public class InstancesAccessor extends AbstractHelixResource {
}
public enum InstanceHealthSelectionBase {
- instance_based,
+ non_zone_based,
zone_based,
cross_zone_based
}
@@ -224,12 +224,17 @@ public class InstancesAccessor extends
AbstractHelixResource {
boolean random) throws IOException {
try {
// TODO: Process input data from the content
+ // TODO: Implement the logic to automatically detect the selection base.
https://github.com/apache/helix/issues/2968#issue-2691677799
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
- InstancesAccessor.InstanceHealthSelectionBase.valueOf(
+
node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
+ ? InstanceHealthSelectionBase.non_zone_based :
InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());
+
List<String> instances = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
+ ClusterService clusterService =
+ new ClusterServiceImpl(getDataAccssor(clusterId),
getConfigAccessor());
List<String> orderOfZone = null;
String customizedInput = null;
@@ -252,6 +257,12 @@ public class InstancesAccessor extends
AbstractHelixResource {
_logger.error(message);
return badRequest(message);
}
+ if (!orderOfZone.isEmpty() && selectionBase ==
InstanceHealthSelectionBase.non_zone_based) {
+ String message =
+ "'zone_order' is set but 'selection_base' is 'non_zone_based'.
Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
+ _logger.error(message);
+ return badRequest(message);
+ }
}
if
(node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name())
!= null) {
@@ -285,6 +296,33 @@ public class InstancesAccessor extends
AbstractHelixResource {
}
}
+ ClusterTopology clusterTopology =
clusterService.getClusterTopology(clusterId);
+ if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
+ if (!clusterService.isClusterTopologyAware(clusterId)) {
+ String message = "Cluster " + clusterId
+ + " is not topology aware. Please enable the topology in cluster
config or set "
+ + "'selection_base' to 'non_zone_based'.";
+ _logger.error(message);
+ return badRequest(message);
+ }
+
+ // Find instances that lack topology information
+ Set<String> instancesWithTopology =
+ clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry
-> entry.getValue().stream())
+ .collect(Collectors.toSet());
+ Set<String> allInstances = clusterTopology.getAllInstances();
+ Set<String> topologyUnawareInstances = new
HashSet<>(instances).stream().filter(
+ instance -> !instancesWithTopology.contains(instance) &&
allInstances.contains(instance))
+ .collect(Collectors.toSet());
+ if (!topologyUnawareInstances.isEmpty()) {
+ String message = "Instances " + topologyUnawareInstances
+ + " do not have topology information. Please set topology
information in instance config or"
+ + " set 'selection_base' to 'non_zone_based'.";
+ _logger.error(message);
+ return badRequest(message);
+ }
+ }
+
String namespace = getNamespace();
MaintenanceManagementService maintenanceService =
new
MaintenanceManagementService.MaintenanceManagementServiceBuilder()
@@ -299,9 +337,6 @@ public class InstancesAccessor extends
AbstractHelixResource {
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.build();
- ClusterService clusterService =
- new ClusterServiceImpl(getDataAccssor(clusterId),
getConfigAccessor());
- ClusterTopology clusterTopology =
clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
@@ -311,18 +346,20 @@ public class InstancesAccessor extends
AbstractHelixResource {
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
- stoppableInstancesSelector.calculateOrderOfZone(instances, random);
ObjectNode result;
- // TODO: Add support for clusters that do not have topology set up.
- // Issue #2893: https://github.com/apache/helix/issues/2893
+
switch (selectionBase) {
case zone_based:
+ stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result =
stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances,
toBeStoppedInstances);
break;
case cross_zone_based:
+ stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result =
stoppableInstancesSelector.getStoppableInstancesCrossZones(instances,
toBeStoppedInstances);
break;
- case instance_based:
+ case non_zone_based:
+ result =
stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances,
toBeStoppedInstances);
+ break;
default:
throw new UnsupportedOperationException("instance_based selection is
not supported yet!");
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
index d789e3615..db93571e8 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
@@ -41,4 +41,11 @@ public interface ClusterService {
* @return
*/
ClusterInfo getClusterInfo(String clusterId);
+
+ /**
+ * Check if the cluster is topology aware
+ * @param clusterId
+ * @return
+ */
+ boolean isClusterTopologyAware(String clusterId);
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
index b4667fb13..a152c3e64 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
@@ -25,10 +25,12 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import io.netty.util.internal.StringUtil;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.rest.server.json.cluster.ClusterInfo;
@@ -102,4 +104,11 @@ public class ClusterServiceImpl implements ClusterService {
.instances(_dataAccessor.getChildNames(keyBuilder.instances()))
.liveInstances(_dataAccessor.getChildNames(keyBuilder.liveInstances())).build();
}
+
+ @Override
+ public boolean isClusterTopologyAware(String clusterId) {
+ ClusterConfig config = _configAccessor.getClusterConfig(clusterId);
+ return config.isTopologyAwareEnabled() &&
!StringUtil.isNullOrEmpty(config.getFaultZoneType())
+ && !StringUtil.isNullOrEmpty(config.getTopology());
+ }
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
index 3d81dc432..6c8f4be40 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
@@ -40,14 +40,18 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestCustomRestClient {
@@ -260,4 +264,39 @@ public class TestCustomRestClient {
Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance ->
clusterHealth.get(instance).isEmpty()));
Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance
-> clusterHealth.get(instance).isEmpty()));
}
+
+ @Test(description = "Test if the aggregated stoppable check request has the
correct format when there"
+ + "are duplicate instances in the instances list and the
toBeStoppedInstances list.")
+ public void testAggregatedCheckRemoveDuplicateInstances()
+ throws IOException {
+ String clusterId = "cluster1";
+
+ MockCustomRestClient customRestClient = new
MockCustomRestClient(_httpClient);
+ HttpResponse httpResponse = mock(HttpResponse.class);
+ StatusLine statusLine = mock(StatusLine.class);
+
+ when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+ customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
+ ImmutableList.of("n1", "n2"),
+ ImmutableSet.of("n1"), clusterId, Collections.emptyMap());
+
+ // Make sure that the duplicate instances are removed from the
toBeStoppedInstances list
+ ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ verify(_httpClient).execute(argThat(x -> {
+ String request = null;
+ try {
+ request = EntityUtils.toString(((HttpPost) x).getEntity());
+ JsonNode node = OBJECT_MAPPER.readTree(request);
+ String instancesInRequest = node.get("instances").toString();
+ Assert.assertEquals(instancesInRequest, "[\"n1\",\"n2\"]");
+ Assert.assertNull(node.get("to_be_stopped_instances"));
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index ad7c4482f..ee2346d2d 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -132,6 +132,7 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
+ protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3",
"instance4", "instance5");
@@ -343,6 +344,7 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER,
STOPPABLE_INSTANCES);
preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2,
STOPPABLE_INSTANCES2);
+ preSetupForNonTopoAwareInstancesStoppableTest(STOPPABLE_CLUSTER3,
STOPPABLE_INSTANCES2);
}
protected Set<String> createInstances(String cluster, int numInstances) {
@@ -602,6 +604,8 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
+ clusterConfig.setTopologyAwareEnabled(true);
+ clusterConfig.setTopology("/helixZoneId/instance");
clusterConfig.setPersistIntermediateAssignment(true);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
@@ -659,6 +663,8 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
+ clusterConfig.setTopologyAwareEnabled(true);
+ clusterConfig.setTopology("/helixZoneId/instance");
clusterConfig.setPersistIntermediateAssignment(true);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
@@ -711,6 +717,62 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}
+
+ private void preSetupForNonTopoAwareInstancesStoppableTest(String
clusterName,
+ List<String> instances) throws Exception {
+ _gSetupTool.addCluster(clusterName, true);
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setFaultZoneType("helixZoneId");
+ clusterConfig.setPersistIntermediateAssignment(true);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+ // Create instance configs that do not include the domain field
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+ int perZoneInstancesCount = 3;
+ int curZoneCount = 0;
+ for (int i = 0; i < instances.size(); i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
+ if (++curZoneCount >= perZoneInstancesCount) {
+ curZoneCount = 0;
+ }
+ instanceConfigs.add(instanceConfig);
+ }
+
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ _gSetupTool.getClusterManagementTool().addInstance(clusterName,
instanceConfig);
+ }
+
+ // Start participant
+ startInstances(clusterName, new TreeSet<>(instances), instances.size());
+ createResources(clusterName, 1, 2, 3);
+ _clusterControllerManagers.add(startController(clusterName));
+
+ // Make sure that cluster config exists
+ boolean isClusterConfigExist = TestHelper.verify(() -> {
+ ClusterConfig stoppableClusterConfig;
+ try {
+ stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
+ } catch (Exception e) {
+ return false;
+ }
+ return (stoppableClusterConfig != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isClusterConfigExist);
+ // Make sure that instance config exists for the instance0 to instance5
+ for (String instance: instances) {
+ boolean isinstanceConfigExist = TestHelper.verify(() -> {
+ InstanceConfig instanceConfig;
+ try {
+ instanceConfig = _configAccessor.getInstanceConfig(clusterName,
instance);
+ } catch (Exception e) {
+ return false;
+ }
+ return (instanceConfig != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isinstanceConfigExist);
+ }
+ _clusters.add(clusterName);
+ _workflowMap.put(clusterName, createWorkflows(clusterName, 3));
+ }
/**
* Starts a HelixRestServer for the test suite.
* @return
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index e1bbe7869..1c9cbcb51 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -30,6 +30,7 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -633,6 +634,137 @@ public class TestInstancesAccessor extends
AbstractTestClass {
System.out.println("End test :" + TestHelper.getTestMethodName());
}
+ @Test(dependsOnMethods = "testSkipClusterLevelHealthCheck")
+ public void testNonTopoAwareStoppableCheck() throws JsonProcessingException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\",
\"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.non_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
"instance3",
+ "instance6", "instance9", "instance10", "instance11", "instance12",
"instance13",
+ "instance14", "invalidInstance",
+
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(),
"INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");
+
+ // Change instance config of instance1 & instance0 to be evacuating
+ String instance0 = "instance0";
+ InstanceConfig instanceConfig =
+ _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance0);
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0,
instanceConfig);
+ String instance1 = "instance1";
+ InstanceConfig instanceConfig1 =
+ _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance1);
+
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1,
instanceConfig1);
+
+ // It takes time to reflect the changes.
+ BestPossibleExternalViewVerifier verifier =
+ new
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER3).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertTrue(stoppableSet.contains("instance12") &&
stoppableSet.contains("instance3")
+ && stoppableSet.contains("instance10"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance14"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0,
instanceConfig);
+
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1,
instanceConfig1);
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testSkipClusterLevelHealthCheck")
+ public void testNonTopoAwareStoppableCheckWithException() throws
JsonProcessingException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\",
\"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
"instance3",
+ "instance6", "instance9", "instance10", "instance11", "instance12",
"instance13",
+ "instance14", "invalidInstance",
+
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(),
"INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");
+
+ // It takes time to reflect the changes.
+ BestPossibleExternalViewVerifier verifier =
+ new
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ // Making the REST Call to cross zone stoppable check while the cluster
has no topology aware
+ // setup. The call should return an error.
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER3)
+ .isBodyReturnExpected(true)
+ .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
+ .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(description = "Test zone selection base with instance that don't have
topology set in the config",
+ dependsOnMethods = "testNonTopoAwareStoppableCheckWithException")
+ public void testZoneSelectionBaseWithInstanceThatDontHaveTopologySet() {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\",
\"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
"instance3",
+ "instance6", "instance9", "instance10", "instance11", "instance12",
"instance13",
+ "instance14", "invalidInstance",
+
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(),
"INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");
+
+ String instance1 = "instance1";
+ InstanceConfig instanceConfig1 =
+ _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance1);
+ String domain = instanceConfig1.getDomainAsString();
+ instanceConfig1.setDomain("FALSE_DOMAIN");
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1,
instanceConfig1);
+
+ // It takes time to reflect the changes.
+ BestPossibleExternalViewVerifier verifier =
+ new
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ // Making the REST Call to cross zone stoppable check while the cluster
has no topology aware
+ // setup. The call should return an error.
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER3)
+ .isBodyReturnExpected(true)
+ .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
+ .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+
+ // Restore the changes on instance 1
+ instanceConfig1.setDomain(domain);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1,
instanceConfig1);
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
private Set<String> getStringSet(JsonNode jsonNode, String key) {
Set<String> result = new HashSet<>();
jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
index 5786350d9..23182d2bb 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
@@ -96,6 +96,30 @@ public class TestClusterService {
Assert.assertEquals(clusterTopology.getClusterId(), TEST_CLUSTER);
}
+ @Test
+ public void testCheckTopologyAware() {
+ Mock mock = new Mock();
+
Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+
+ ClusterConfig config = new ClusterConfig(TEST_CLUSTER);
+ config.setTopology("/zone");
+
when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config);
+
Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+
+ config = new ClusterConfig(TEST_CLUSTER);
+ config.setFaultZoneType("zone");
+ config.setTopology("/zone");
+
when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config);
+
Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+
+ config = new ClusterConfig(TEST_CLUSTER);
+ config.setFaultZoneType("zone");
+ config.setTopology("/zone");
+ config.setTopologyAwareEnabled(true);
+
when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config);
+
Assert.assertTrue(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+ }
+
private final class Mock {
private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
private ConfigAccessor configAccessor = mock(ConfigAccessor.class);