This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 204f034 Add admin and Rest API to purge instances that have been
offline more than specified period of time (#1494)
204f034 is described below
commit 204f034e4f8e5f49127b702b7a129892240c945c
Author: Meng Zhang <[email protected]>
AuthorDate: Sun Nov 1 22:12:50 2020 -0800
Add admin and Rest API to purge instances that have been offline more than
specified period of time (#1494)
This commit adds admin and Rest API to purge instances that have been
offline more than specified period of time
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 10 ++++
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 61 ++++++++++++++++++++++
.../java/org/apache/helix/model/ClusterConfig.java | 28 +++++-----
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 54 ++++++++++++++++++-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 4 ++
.../org/apache/helix/model/TestClusterConfig.java | 10 ++--
.../rest/server/resources/AbstractResource.java | 1 +
.../server/resources/helix/ClusterAccessor.java | 10 +++-
.../helix/rest/server/TestClusterAccessor.java | 45 ++++++++++++++++
9 files changed, 203 insertions(+), 20 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 3618b12..3c456bb 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -229,6 +229,16 @@ public interface HelixAdmin {
void dropInstance(String clusterName, InstanceConfig instanceConfig);
/**
+ * Purge offline instances that have been offline for longer than the
offline duration time
+ * from a cluster
+ * @param clusterName
+ * @param offlineDuration if an offline instance has been offline for longer
than the set
+ * offlineDuration, the offline instance becomes
eligible for being
+ * purged/deleted
+ */
+ void purgeOfflineInstances(String clusterName, long offlineDuration);
+
+ /**
* Get ideal state for a resource
* @param clusterName
* @param resourceName
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 1267161..7b946b1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -76,6 +76,7 @@ import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -250,6 +251,33 @@ public class ZKHelixAdmin implements HelixAdmin {
}
}
+ /**
+ * Please note that the purge function should only be called when there is
no new instance
+ * joining happening in the cluster. The reason is that current
implementation is not thread safe,
+ * meaning that if the offline instance comes online while the purging is
ongoing, race
+ * condition may happen, and we may have live instance in the cluster
without corresponding
+ * instance config.
+ * TODO: consider using Helix lock to prevent race condition, and make sure
zookeeper is ok
+ * with the extra traffic caused by lock.
+ */
+ @Override
+ public void purgeOfflineInstances(String clusterName, long offlineDuration) {
+ Map<String, InstanceConfig> timeoutOfflineInstances =
+ findTimeoutOfflineInstances(clusterName, offlineDuration);
+ List<String> failToPurgeInstances = new ArrayList<>();
+ timeoutOfflineInstances.values().forEach(instance -> {
+ try {
+ dropInstance(clusterName, instance);
+ } catch (HelixException e) {
+ failToPurgeInstances.add(instance.getInstanceName());
+ }
+ });
+ if (failToPurgeInstances.size() > 0) {
+ LOG.error("ZKHelixAdmin::purgeOfflineInstances(): failed to drop the
following instances: "
+ + failToPurgeInstances);
+ }
+ }
+
@Override
public InstanceConfig getInstanceConfig(String clusterName, String
instanceName) {
logger.info("Get instance config for instance {} from cluster {}.",
instanceName, clusterName);
@@ -2061,4 +2089,37 @@ public class ZKHelixAdmin implements HelixAdmin {
_zkAddress), false);
}
}
+
+ private Map<String, InstanceConfig> findTimeoutOfflineInstances(String
clusterName,
+ long offlineDuration) {
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ // in case there is no customized timeout value, use the one defined in
cluster config
+ if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) {
+ offlineDuration =
+
_configAccessor.getClusterConfig(clusterName).getOfflineDurationForPurge();
+ if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET)
{
+ return instanceConfigMap;
+ }
+ }
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ instanceConfigMap =
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
+ List<String> liveNodes =
accessor.getChildNames(keyBuilder.liveInstances());
+ instanceConfigMap.keySet().removeAll(liveNodes);
+
+ Set<String> toRemoveInstances = new HashSet<>();
+ for (String instanceName : instanceConfigMap.keySet()) {
+ ParticipantHistory participantHistory =
+ accessor.getProperty(keyBuilder.participantHistory(instanceName));
+ long lastOfflineTime = participantHistory.getLastOfflineTime();
+ if (lastOfflineTime == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET
+ || System.currentTimeMillis() - lastOfflineTime < offlineDuration) {
+ toRemoveInstances.add(instanceName);
+ }
+ }
+ instanceConfigMap.keySet().removeAll(toRemoveInstances);
+ return instanceConfigMap;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index ef87542..492ac7f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -134,7 +134,7 @@ public class ClusterConfig extends HelixProperty {
// offline for more than this specified time period, and users call purge
participant API,
// then the node will be removed.
// The unit is milliseconds.
- OFFLINE_NODE_TIME_OUT_FOR_PURGE
+ OFFLINE_DURATION_FOR_PURGE_MS
}
public enum GlobalRebalancePreferenceKey {
@@ -153,6 +153,8 @@ public class ClusterConfig extends HelixProperty {
public final static String TASK_QUOTA_RATIO_NOT_SET = "-1";
+ public static final long OFFLINE_DURATION_FOR_PURGE_NOT_SET = -1L;
+
// Default preference for all the aspects should be the same to ensure
balanced setup.
public final static Map<GlobalRebalancePreferenceKey, Integer>
DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
@@ -163,7 +165,7 @@ public class ClusterConfig extends HelixProperty {
private final static int MIN_REBALANCE_PREFERENCE = 0;
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED =
true;
private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
- private static final long VALUE_NOT_SET = -1;
+ private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET
= -1;
/**
* Instantiate for a specific cluster
@@ -940,28 +942,28 @@ public class ClusterConfig extends HelixProperty {
public long getOfflineNodeTimeOutForMaintenanceMode() {
return _record
.getLongField(ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE.name(),
- VALUE_NOT_SET);
+ OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET);
}
/**
- * Set the default time out window for offline nodes to be purged. If an
offline node has been
+ * Set the default duration for offline nodes to be purged. If an offline
node has been
* offline for more than this specified time period, when users call purge
participants API,
* the node will be dropped.
- * @param timeOut timeout window in milliseconds.
+ * @param offlineDuration offline duration in milliseconds.
*/
- public void setOfflineNodeTimeOutForPurge(long timeOut) {
-
_record.setLongField(ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE.name(),
- timeOut);
+ public void setOfflineDurationForPurge(long offlineDuration) {
+
_record.setLongField(ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS.name(),
+ offlineDuration);
}
/**
- * Get the default time out window for offline nodes to be purged.
- * @return timeout window in milliseconds
+ * Get the default offline duration for offline nodes to be purged.
+ * @return offline duration in milliseconds
*/
- public long getOfflineNodeTimeOutForPurge() {
+ public long getOfflineDurationForPurge() {
return _record
-
.getLongField(ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE.name(),
- VALUE_NOT_SET);
+
.getLongField(ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS.name(),
+ OFFLINE_DURATION_FOR_PURGE_NOT_SET);
}
/**
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 6585439..ce6ea9d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -83,7 +83,6 @@ import org.testng.AssertJUnit;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
public class TestZkHelixAdmin extends ZkUnitTestBase {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -1008,4 +1007,57 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
Assert.assertEquals(listTypesFromZk.get(0), "mockType2");
Assert.assertEquals(listTypesFromZk.get(1), "mockType3");
}
+
+ @Test
+ public void testPurgeOfflineInstances() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ tool.addCluster(clusterName, true);
+
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName,
_baseAccessor);
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+ // set default offline duration for purge in cluster config
+ ClusterConfig clusterConfig =
dataAccessor.getProperty(keyBuilder.clusterConfig());
+ clusterConfig.setOfflineDurationForPurge(100000L);
+ dataAccessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+
+ String hostname = "host1";
+ String port = "9999";
+ String instanceName = hostname + "_" + port;
+ InstanceConfig config = new InstanceConfig(instanceName);
+ config.setHostName(hostname);
+ config.setPort(port);
+ tool.addInstance(clusterName, config);
+ tool.enableInstance(clusterName, instanceName, true);
+
+ LiveInstance liveInstance = new LiveInstance(instanceName);
+ liveInstance.setSessionId(UUID.randomUUID().toString());
+ liveInstance.setHelixVersion(UUID.randomUUID().toString());
+ dataAccessor.setProperty(keyBuilder.liveInstance(instanceName),
liveInstance);
+
+ dataAccessor.removeProperty(keyBuilder.liveInstance(instanceName));
+ ZNRecord znRecord = new ZNRecord(instanceName);
+ znRecord
+ .setSimpleField("LAST_OFFLINE_TIME",
String.valueOf(System.currentTimeMillis() - 50000L));
+ _baseAccessor.set(PropertyPathBuilder.instanceHistory(clusterName,
instanceName), znRecord, 1);
+
+ // This purge will not remove the instance since the default offline
duration is not met yet.
+ tool.purgeOfflineInstances(clusterName,
ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET);
+
Assert.assertTrue(_gZkClient.exists(keyBuilder.instanceConfig(instanceName).getPath()),
+ "Instance should still be there");
+
+ // This purge will remove the instance as the customized offline duration
is met.
+ tool.purgeOfflineInstances(clusterName, 10000L);
+
Assert.assertFalse(_gZkClient.exists(keyBuilder.instanceConfig(instanceName).getPath()),
+ "Instance should already be dropped");
+
+ tool.dropCluster(clusterName);
+ System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 44cdb0a..e954c93 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -238,6 +238,10 @@ public class MockHelixAdmin implements HelixAdmin {
}
+ @Override
+ public void purgeOfflineInstances(String clusterName, long offlineDuration) {
+ }
+
@Override public IdealState getResourceIdealState(String clusterName, String
resourceName) {
return null;
}
diff --git
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index a9c26bf..8d6b0a2 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -324,21 +324,21 @@ public class TestClusterConfig {
@Test
public void testGetOfflineNodeTimeOutForPurge() {
ClusterConfig testConfig = new ClusterConfig("testId");
- Assert.assertEquals(testConfig.getOfflineNodeTimeOutForPurge(), -1);
+ Assert.assertEquals(testConfig.getOfflineDurationForPurge(), -1);
testConfig.getRecord()
-
.setLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE
+
.setLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS
.name(),
10000L);
- Assert.assertEquals(testConfig.getOfflineNodeTimeOutForPurge(), 10000L);
+ Assert.assertEquals(testConfig.getOfflineDurationForPurge(), 10000L);
}
@Test
public void testSetOfflineNodeTimeOutForPurge() {
ClusterConfig testConfig = new ClusterConfig("testId");
- testConfig.setOfflineNodeTimeOutForPurge(10000L);
+ testConfig.setOfflineDurationForPurge(10000L);
Assert.assertEquals(testConfig.getRecord()
-
.getLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE
+
.getLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS
.name(),
-1), 10000L);
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 5101b22..ac90227 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -77,6 +77,7 @@ public class AbstractResource {
validateWeight,
enableWagedRebalance,
enableWagedRebalanceForAllResources,
+ purgeOfflineParticipants,
getInstance,
getAllInstances
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index c0fa271..a16821f 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -203,7 +203,7 @@ public class ClusterAccessor extends AbstractHelixResource {
@Path("{clusterId}")
public Response updateCluster(@PathParam("clusterId") String clusterId,
@QueryParam("command") String commandStr, @QueryParam("superCluster")
String superCluster,
- String content) {
+ @QueryParam("duration") Long duration, String content) {
Command command;
try {
command = getCommand(commandStr);
@@ -282,6 +282,14 @@ public class ClusterAccessor extends AbstractHelixResource
{
return badRequest(e.getMessage());
}
break;
+ case purgeOfflineParticipants:
+ if (duration == null || duration < 0) {
+ helixAdmin
+ .purgeOfflineInstances(clusterId,
ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET);
+ } else {
+ helixAdmin.purgeOfflineInstances(clusterId, duration);
+ }
+ break;
default:
return badRequest("Unsupported command {}." + command);
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index c7ade9a..62cc1ef 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -518,6 +518,51 @@ public class TestClusterAccessor extends AbstractTestClass
{
}
@Test(dependsOnMethods = "testEnableDisableMaintenanceModeWithCustomFields")
+ public void testPurgeOfflineParticipants() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String cluster = _clusters.iterator().next();
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(cluster,
_baseAccessor);
+
+ String instance1 = cluster + "localhost_12923";
+ String instance2 = cluster + "localhost_12924";
+ String instance3 = cluster + "localhost_12926";
+ post("clusters/" + cluster,
+ ImmutableMap.of("command", "purgeOfflineParticipants", "duration",
"100000000"), null,
+ Response.Status.OK.getStatusCode());
+
+ //Although the three instances are not in live instances, the timeout is
not met, and
+ // instances will not be dropped by purging action
+ Assert.assertTrue(accessor.getBaseDataAccessor()
+ .exists(accessor.keyBuilder().instanceConfig(instance1).getPath(), 0));
+ Assert.assertTrue(accessor.getBaseDataAccessor()
+ .exists(accessor.keyBuilder().instanceConfig(instance2).getPath(), 0));
+ Assert.assertTrue(accessor.getBaseDataAccessor()
+ .exists(accessor.keyBuilder().instanceConfig(instance3).getPath(), 0));
+
+ ClusterConfig configDelta = new ClusterConfig(cluster);
+ configDelta.getRecord()
+
.setSimpleField(ClusterConfig.ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS.name(),
+ "100");
+ updateClusterConfigFromRest(cluster, configDelta, Command.update);
+
+ //Purge again without customized timeout, and the action will use default
timeout value.
+ post("clusters/" + cluster, ImmutableMap.of("command",
"purgeOfflineParticipants"), null,
+ Response.Status.OK.getStatusCode());
+ Assert.assertFalse(accessor.getBaseDataAccessor()
+ .exists(accessor.keyBuilder().instanceConfig(instance1).getPath(), 0));
+ Assert.assertFalse(accessor.getBaseDataAccessor()
+ .exists(accessor.keyBuilder().instanceConfig(instance2).getPath(), 0));
+ Assert.assertFalse(accessor.getBaseDataAccessor()
+ .exists(accessor.keyBuilder().instanceConfig(instance3).getPath(), 0));
+
+ // reset cluster status to previous one
+ _gSetupTool.addInstanceToCluster(cluster, instance1);
+ _gSetupTool.addInstanceToCluster(cluster, instance2);
+ _gSetupTool.addInstanceToCluster(cluster, instance3);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testEnableDisableMaintenanceModeWithCustomFields")
public void testGetStateModelDef() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());