Repository: helix Updated Branches: refs/heads/master bd171f26d -> cbd3bd991
Change current state clean to user defined session Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cbd3bd99 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cbd3bd99 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cbd3bd99 Branch: refs/heads/master Commit: cbd3bd991bf8d4cd8be9ee9e591549d26d469e16 Parents: bd171f2 Author: Junkai Xue <[email protected]> Authored: Fri Jun 22 14:30:24 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Jul 16 15:17:36 2018 -0700 ---------------------------------------------------------------------- .../tools/commandtools/CurrentStateCleanUp.java | 71 +++++++++----------- 1 file changed, 33 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/cbd3bd99/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java index a8d2bae..b2ceed7 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java +++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java @@ -31,7 +31,8 @@ public class CurrentStateCleanUp { public static final String zkServer = "zkSvr"; public static final String cluster = "cluster"; - public static final String instances = "instances"; + public static final String instance = "instance"; + public static final String session = "session"; public static final String help = "help"; private static Options parseCommandLineOptions() { @@ -51,11 +52,17 @@ public class CurrentStateCleanUp { clusterOption.setRequired(true); clusterOption.setArgName("Cluster name (Required)"); - Option instancesOption = OptionBuilder.withLongOpt(instances) - .withDescription("Provide instance names, separated by ','").create(); - instancesOption.setArgs(1); - instancesOption.setRequired(true); - instancesOption.setArgName("Instance names (Required)"); + Option instanceOption = OptionBuilder.withLongOpt(instance) + .withDescription("Provide instance name").create(); + instanceOption.setArgs(1); + instanceOption.setRequired(true); + instanceOption.setArgName("Instance name"); + + Option sessionOption = OptionBuilder.withLongOpt(session) + .withDescription("Provide instance session").create(); + sessionOption.setArgs(1); + sessionOption.setRequired(true); + sessionOption.setArgName("Session name"); OptionGroup optionGroup = new OptionGroup(); optionGroup.addOption(zkServerOption); @@ -63,7 +70,8 @@ public class CurrentStateCleanUp { Options options = new Options(); options.addOption(helpOption); options.addOption(clusterOption); - options.addOption(instancesOption); + options.addOption(instanceOption); + options.addOption(sessionOption); options.addOptionGroup(optionGroup); @@ -71,23 +79,16 @@ public class CurrentStateCleanUp { } public static void cleanupCurrentStatesForCluster(String zkConnectString, String clusterName, - List<String> instanceNames) throws Exception { + String instanceName, String session) throws Exception { HelixManager manager = HelixManagerFactory .getZKHelixManager(clusterName, "Administorator", InstanceType.ADMINISTRATOR, zkConnectString); manager.connect(); try { HelixDataAccessor accessor = manager.getHelixDataAccessor(); - List<LiveInstance> liveInstances = - accessor.getChildValues(accessor.keyBuilder().liveInstances()); - Map<String, LiveInstance> liveInstanceMap = new HashMap<>(); - for (LiveInstance liveInstance : liveInstances) { - liveInstanceMap.put(liveInstance.getInstanceName(), liveInstance); - } DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { + @Override public ZNRecord update(ZNRecord currentData) { Set<String> partitionToRemove = new HashSet<>(); for (String partition : currentData.getMapFields().keySet()) { if (currentData.getMapField(partition).get("CURRENT_STATE") @@ -100,26 +101,20 @@ public class CurrentStateCleanUp { } }; - for (String instanceName : instanceNames) { - if (liveInstanceMap.containsKey(instanceName)) { - LOG.info( - String.format("Processing cleaning current state for instance: %s", instanceName)); - List<String> currentStateNames = accessor.getChildNames(accessor.keyBuilder() - .currentStates(instanceName, liveInstanceMap.get(instanceName).getSessionId())); - for (String currentStateName : currentStateNames) { - PropertyKey key = accessor.keyBuilder() - .currentState(instanceName, liveInstanceMap.get(instanceName).getSessionId(), - currentStateName); - accessor.getBaseDataAccessor().update(key.getPath(), updater, AccessOption.PERSISTENT); - CurrentState currentState = accessor.getProperty(key); - if (currentState.getPartitionStateMap().size() == 0) { - accessor.getBaseDataAccessor().remove(key.getPath(), AccessOption.PERSISTENT); - LOG.info(String - .format("Remove current state for instance: %s, resource %s", instanceName, - currentStateName)); - } - } + LOG.info(String.format("Processing cleaning current state for instance: %s", instanceName)); + List<String> currentStateNames = + accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, session)); + for (String currentStateName : currentStateNames) { + PropertyKey key = + accessor.keyBuilder().currentState(instanceName, session, currentStateName); + accessor.getBaseDataAccessor().update(key.getPath(), updater, AccessOption.PERSISTENT); + CurrentState currentState = accessor.getProperty(key); + if (currentState.getPartitionStateMap().size() == 0) { + accessor.getBaseDataAccessor().remove(key.getPath(), AccessOption.PERSISTENT); + LOG.info(String.format("Remove current state for instance: %s, resource %s", instanceName, + currentStateName)); } + } } catch (Exception e) { e.printStackTrace(); @@ -138,13 +133,13 @@ public class CurrentStateCleanUp { CommandLine cmd = ToolsUtil.processCommandLineArgs(args, parseCommandLineOptions()); String zkConnectString = cmd.getOptionValue(zkServer); String clusterName = cmd.getOptionValue(cluster); - String instance = cmd.getOptionValue(instances); - List<String> instanceNames = Arrays.asList(instance.split(",")); + String instanceName = cmd.getOptionValue(instance); + String sessionId = cmd.getOptionValue(session); LOG.info(String .format("Starting cleaning current state with ZK: %s, cluster: %s", zkConnectString, clusterName)); - cleanupCurrentStatesForCluster(zkConnectString, clusterName, instanceNames); + cleanupCurrentStatesForCluster(zkConnectString, clusterName, instanceName, sessionId); } }
