This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 204f034  Add admin and Rest API to purge instances that have been 
offline more than specified period of time (#1494)
204f034 is described below

commit 204f034e4f8e5f49127b702b7a129892240c945c
Author: Meng Zhang <[email protected]>
AuthorDate: Sun Nov 1 22:12:50 2020 -0800

    Add admin and Rest API to purge instances that have been offline more than 
specified period of time (#1494)
    
    This commit adds admin and Rest API to purge instances that have been 
offline more than specified period of time
---
 .../src/main/java/org/apache/helix/HelixAdmin.java | 10 ++++
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 61 ++++++++++++++++++++++
 .../java/org/apache/helix/model/ClusterConfig.java | 28 +++++-----
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  | 54 ++++++++++++++++++-
 .../java/org/apache/helix/mock/MockHelixAdmin.java |  4 ++
 .../org/apache/helix/model/TestClusterConfig.java  | 10 ++--
 .../rest/server/resources/AbstractResource.java    |  1 +
 .../server/resources/helix/ClusterAccessor.java    | 10 +++-
 .../helix/rest/server/TestClusterAccessor.java     | 45 ++++++++++++++++
 9 files changed, 203 insertions(+), 20 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 3618b12..3c456bb 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -229,6 +229,16 @@ public interface HelixAdmin {
   void dropInstance(String clusterName, InstanceConfig instanceConfig);
 
   /**
+   * Purge offline instances that have been offline for longer than the 
offline duration time
+   * from a cluster
+   * @param clusterName
+   * @param offlineDuration if an offline instance has been offline for longer 
than the set
+   *                        offlineDuration, the offline instance becomes 
eligible for being
+   *                        purged/deleted
+   */
+  void purgeOfflineInstances(String clusterName, long offlineDuration);
+
+  /**
    * Get ideal state for a resource
    * @param clusterName
    * @param resourceName
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 1267161..7b946b1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -76,6 +76,7 @@ import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -250,6 +251,33 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
   }
 
+  /**
+   * Please note that the purge function should only be called when there is 
no new instance
+   * joining happening in the cluster. The reason is that current 
implementation is not thread safe,
+   * meaning that if the offline instance comes online while the purging is 
ongoing, race
+   * condition may happen, and we may have live instance in the cluster 
without corresponding
+   * instance config.
+   * TODO: consider using Helix lock to prevent race condition, and make sure 
zookeeper is ok
+   *  with the extra traffic caused by lock.
+   */
+  @Override
+  public void purgeOfflineInstances(String clusterName, long offlineDuration) {
+    Map<String, InstanceConfig> timeoutOfflineInstances =
+        findTimeoutOfflineInstances(clusterName, offlineDuration);
+    List<String> failToPurgeInstances = new ArrayList<>();
+    timeoutOfflineInstances.values().forEach(instance -> {
+      try {
+        dropInstance(clusterName, instance);
+      } catch (HelixException e) {
+        failToPurgeInstances.add(instance.getInstanceName());
+      }
+    });
+    if (failToPurgeInstances.size() > 0) {
+      LOG.error("ZKHelixAdmin::purgeOfflineInstances(): failed to drop the 
following instances: "
+          + failToPurgeInstances);
+    }
+  }
+
   @Override
   public InstanceConfig getInstanceConfig(String clusterName, String 
instanceName) {
     logger.info("Get instance config for instance {} from cluster {}.", 
instanceName, clusterName);
@@ -2061,4 +2089,37 @@ public class ZKHelixAdmin implements HelixAdmin {
               _zkAddress), false);
     }
   }
+
+  private Map<String, InstanceConfig> findTimeoutOfflineInstances(String 
clusterName,
+      long offlineDuration) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    // in case there is no customized timeout value, use the one defined in 
cluster config
+    if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) {
+      offlineDuration =
+          
_configAccessor.getClusterConfig(clusterName).getOfflineDurationForPurge();
+      if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) 
{
+        return instanceConfigMap;
+      }
+    }
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    instanceConfigMap = 
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
+    List<String> liveNodes = 
accessor.getChildNames(keyBuilder.liveInstances());
+    instanceConfigMap.keySet().removeAll(liveNodes);
+
+    Set<String> toRemoveInstances = new HashSet<>();
+    for (String instanceName : instanceConfigMap.keySet()) {
+      ParticipantHistory participantHistory =
+          accessor.getProperty(keyBuilder.participantHistory(instanceName));
+      long lastOfflineTime = participantHistory.getLastOfflineTime();
+      if (lastOfflineTime == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET
+          || System.currentTimeMillis() - lastOfflineTime < offlineDuration) {
+        toRemoveInstances.add(instanceName);
+      }
+    }
+    instanceConfigMap.keySet().removeAll(toRemoveInstances);
+    return instanceConfigMap;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index ef87542..492ac7f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -134,7 +134,7 @@ public class ClusterConfig extends HelixProperty {
     // offline for more than this specified time period, and users call purge 
participant API,
     // then the node will be removed.
     // The unit is milliseconds.
-    OFFLINE_NODE_TIME_OUT_FOR_PURGE
+    OFFLINE_DURATION_FOR_PURGE_MS
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -153,6 +153,8 @@ public class ClusterConfig extends HelixProperty {
 
   public final static String TASK_QUOTA_RATIO_NOT_SET = "-1";
 
+  public static final long OFFLINE_DURATION_FOR_PURGE_NOT_SET = -1L;
+
   // Default preference for all the aspects should be the same to ensure 
balanced setup.
   public final static Map<GlobalRebalancePreferenceKey, Integer>
       DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
@@ -163,7 +165,7 @@ public class ClusterConfig extends HelixProperty {
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = 
true;
   private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
-  private static final long VALUE_NOT_SET = -1;
+  private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET 
= -1;
 
   /**
    * Instantiate for a specific cluster
@@ -940,28 +942,28 @@ public class ClusterConfig extends HelixProperty {
   public long getOfflineNodeTimeOutForMaintenanceMode() {
     return _record
         
.getLongField(ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE.name(),
-            VALUE_NOT_SET);
+            OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET);
   }
 
   /**
-   * Set the default time out window for offline nodes to be purged. If an 
offline node has been
+   * Set the default duration for offline nodes to be purged. If an offline 
node has been
    * offline for more than this specified time period, when users call purge 
participants API,
    * the node will be dropped.
-   * @param timeOut timeout window in milliseconds.
+   * @param offlineDuration offline duration in milliseconds.
    */
-  public void setOfflineNodeTimeOutForPurge(long timeOut) {
-    
_record.setLongField(ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE.name(),
-        timeOut);
+  public void setOfflineDurationForPurge(long offlineDuration) {
+    
_record.setLongField(ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS.name(),
+        offlineDuration);
   }
 
   /**
-   * Get the default time out window for offline nodes to be purged.
-   * @return timeout window in milliseconds
+   * Get the default offline duration for offline nodes to be purged.
+   * @return offline duration in milliseconds
    */
-  public long getOfflineNodeTimeOutForPurge() {
+  public long getOfflineDurationForPurge() {
     return _record
-        
.getLongField(ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE.name(),
-            VALUE_NOT_SET);
+        
.getLongField(ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS.name(),
+            OFFLINE_DURATION_FOR_PURGE_NOT_SET);
   }
 
   /**
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 6585439..ce6ea9d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -83,7 +83,6 @@ import org.testng.AssertJUnit;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 public class TestZkHelixAdmin extends ZkUnitTestBase {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -1008,4 +1007,57 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     Assert.assertEquals(listTypesFromZk.get(0), "mockType2");
     Assert.assertEquals(listTypesFromZk.get(1), "mockType3");
   }
+
+  @Test
+  public void testPurgeOfflineInstances() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    tool.addCluster(clusterName, true);
+
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, 
_baseAccessor);
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    // set default offline duration for purge in cluster config
+    ClusterConfig clusterConfig = 
dataAccessor.getProperty(keyBuilder.clusterConfig());
+    clusterConfig.setOfflineDurationForPurge(100000L);
+    dataAccessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+
+    String hostname = "host1";
+    String port = "9999";
+    String instanceName = hostname + "_" + port;
+    InstanceConfig config = new InstanceConfig(instanceName);
+    config.setHostName(hostname);
+    config.setPort(port);
+    tool.addInstance(clusterName, config);
+    tool.enableInstance(clusterName, instanceName, true);
+
+    LiveInstance liveInstance = new LiveInstance(instanceName);
+    liveInstance.setSessionId(UUID.randomUUID().toString());
+    liveInstance.setHelixVersion(UUID.randomUUID().toString());
+    dataAccessor.setProperty(keyBuilder.liveInstance(instanceName), 
liveInstance);
+
+    dataAccessor.removeProperty(keyBuilder.liveInstance(instanceName));
+    ZNRecord znRecord = new ZNRecord(instanceName);
+    znRecord
+        .setSimpleField("LAST_OFFLINE_TIME", 
String.valueOf(System.currentTimeMillis() - 50000L));
+    _baseAccessor.set(PropertyPathBuilder.instanceHistory(clusterName, 
instanceName), znRecord, 1);
+
+    // This purge will not remove the instance since the default offline 
duration is not met yet.
+    tool.purgeOfflineInstances(clusterName, 
ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET);
+    
Assert.assertTrue(_gZkClient.exists(keyBuilder.instanceConfig(instanceName).getPath()),
+        "Instance should still be there");
+
+    // This purge will remove the instance as the customized offline duration 
is met.
+    tool.purgeOfflineInstances(clusterName, 10000L);
+    
Assert.assertFalse(_gZkClient.exists(keyBuilder.instanceConfig(instanceName).getPath()),
+        "Instance should already be dropped");
+
+    tool.dropCluster(clusterName);
+    System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 44cdb0a..e954c93 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -238,6 +238,10 @@ public class MockHelixAdmin implements HelixAdmin {
 
   }
 
+  @Override
+  public void purgeOfflineInstances(String clusterName, long offlineDuration) {
+  }
+
   @Override public IdealState getResourceIdealState(String clusterName, String 
resourceName) {
     return null;
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java 
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index a9c26bf..8d6b0a2 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -324,21 +324,21 @@ public class TestClusterConfig {
   @Test
   public void testGetOfflineNodeTimeOutForPurge() {
     ClusterConfig testConfig = new ClusterConfig("testId");
-    Assert.assertEquals(testConfig.getOfflineNodeTimeOutForPurge(), -1);
+    Assert.assertEquals(testConfig.getOfflineDurationForPurge(), -1);
 
     testConfig.getRecord()
-        
.setLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE
+        
.setLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS
                 .name(),
             10000L);
-    Assert.assertEquals(testConfig.getOfflineNodeTimeOutForPurge(), 10000L);
+    Assert.assertEquals(testConfig.getOfflineDurationForPurge(), 10000L);
   }
 
   @Test
   public void testSetOfflineNodeTimeOutForPurge() {
     ClusterConfig testConfig = new ClusterConfig("testId");
-    testConfig.setOfflineNodeTimeOutForPurge(10000L);
+    testConfig.setOfflineDurationForPurge(10000L);
     Assert.assertEquals(testConfig.getRecord()
-        
.getLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_NODE_TIME_OUT_FOR_PURGE
+        
.getLongField(ClusterConfig.ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS
                 .name(),
             -1), 10000L);
   }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 5101b22..ac90227 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -77,6 +77,7 @@ public class AbstractResource {
     validateWeight,
     enableWagedRebalance,
     enableWagedRebalanceForAllResources,
+    purgeOfflineParticipants,
     getInstance,
     getAllInstances
   }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index c0fa271..a16821f 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -203,7 +203,7 @@ public class ClusterAccessor extends AbstractHelixResource {
   @Path("{clusterId}")
   public Response updateCluster(@PathParam("clusterId") String clusterId,
       @QueryParam("command") String commandStr, @QueryParam("superCluster") 
String superCluster,
-      String content) {
+      @QueryParam("duration") Long duration, String content) {
     Command command;
     try {
       command = getCommand(commandStr);
@@ -282,6 +282,14 @@ public class ClusterAccessor extends AbstractHelixResource 
{
           return badRequest(e.getMessage());
         }
         break;
+      case purgeOfflineParticipants:
+        if (duration == null || duration < 0) {
+          helixAdmin
+              .purgeOfflineInstances(clusterId, 
ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET);
+        } else {
+          helixAdmin.purgeOfflineInstances(clusterId, duration);
+        }
+        break;
       default:
         return badRequest("Unsupported command {}." + command);
     }
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index c7ade9a..62cc1ef 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -518,6 +518,51 @@ public class TestClusterAccessor extends AbstractTestClass 
{
   }
 
   @Test(dependsOnMethods = "testEnableDisableMaintenanceModeWithCustomFields")
+  public void testPurgeOfflineParticipants() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String cluster = _clusters.iterator().next();
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(cluster, 
_baseAccessor);
+
+    String instance1 = cluster + "localhost_12923";
+    String instance2 = cluster + "localhost_12924";
+    String instance3 = cluster + "localhost_12926";
+    post("clusters/" + cluster,
+        ImmutableMap.of("command", "purgeOfflineParticipants", "duration", 
"100000000"), null,
+        Response.Status.OK.getStatusCode());
+
+    //Although the three instances are not in live instances, the timeout is 
not met, and
+    // instances will not be dropped by purging action
+    Assert.assertTrue(accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().instanceConfig(instance1).getPath(), 0));
+    Assert.assertTrue(accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().instanceConfig(instance2).getPath(), 0));
+    Assert.assertTrue(accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().instanceConfig(instance3).getPath(), 0));
+
+    ClusterConfig configDelta = new ClusterConfig(cluster);
+    configDelta.getRecord()
+        
.setSimpleField(ClusterConfig.ClusterConfigProperty.OFFLINE_DURATION_FOR_PURGE_MS.name(),
+            "100");
+    updateClusterConfigFromRest(cluster, configDelta, Command.update);
+
+    //Purge again without customized timeout, and the action will use default 
timeout value.
+    post("clusters/" + cluster, ImmutableMap.of("command", 
"purgeOfflineParticipants"), null,
+        Response.Status.OK.getStatusCode());
+    Assert.assertFalse(accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().instanceConfig(instance1).getPath(), 0));
+    Assert.assertFalse(accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().instanceConfig(instance2).getPath(), 0));
+    Assert.assertFalse(accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().instanceConfig(instance3).getPath(), 0));
+
+    // reset cluster status to previous one
+    _gSetupTool.addInstanceToCluster(cluster, instance1);
+    _gSetupTool.addInstanceToCluster(cluster, instance2);
+    _gSetupTool.addInstanceToCluster(cluster, instance3);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testEnableDisableMaintenanceModeWithCustomFields")
   public void testGetStateModelDef() throws IOException {
 
     System.out.println("Start test :" + TestHelper.getTestMethodName());

Reply via email to