AMBARI-21413 Move the Log Search ZK config root to the connect string (mgergely)
Change-Id: Ia50439cf278556b5bf862c996644f60f3a826b32 Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8e719f79 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8e719f79 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8e719f79 Branch: refs/heads/branch-feature-AMBARI-14714 Commit: 8e719f79402c10d529d2006702148acb085bccfe Parents: 4256067 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Fri Jul 7 01:54:18 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Fri Jul 7 01:54:18 2017 +0200 ---------------------------------------------------------------------- .../config/zookeeper/LogSearchConfigZK.java | 39 ++++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8e719f79/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index 6d36203..fdd8ed6 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -97,7 +97,6 @@ public class LogSearchConfigZK implements LogSearchConfig { private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root"; private Map<String, String> properties; - private String root; private CuratorFramework client; private TreeCache cache; private Gson gson; @@ -106,29 +105,29 @@ public class LogSearchConfigZK implements LogSearchConfig { public void init(Component component, Map<String, String> properties, String clusterName) throws Exception { this.properties = properties; - LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY)); + String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT); + LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root); client = CuratorFrameworkFactory.builder() - .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY)) + .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .build(); client.start(); - root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT); if (component == Component.SERVER) { - if (client.checkExists().forPath(root) == null) { - client.create().creatingParentContainersIfNeeded().forPath(root); + if (client.checkExists().forPath("/") == null) { + client.create().creatingParentContainersIfNeeded().forPath("/"); } - cache = new TreeCache(client, root); + cache = new TreeCache(client, "/"); cache.start(); } else { - while (client.checkExists().forPath(root) == null) { + while (client.checkExists().forPath("/") == null) { LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds"); Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000); } - cache = new TreeCache(client, String.format("%s/%s", root, clusterName)); + cache = new TreeCache(client, String.format("/%s", clusterName)); } gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); @@ -136,13 +135,13 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { - String nodePath = root + "/" + clusterName + "/input/" + serviceName; + String nodePath = String.format("/%s/input/%s", clusterName, serviceName); return cache.getCurrentData(nodePath) != null; } @Override public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { - String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName); + String nodePath = String.format("/%s/input/%s", clusterName, serviceName); try { client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes()); LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName); @@ -153,7 +152,7 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { - String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName); + String nodePath = String.format("/%s/input/%s", clusterName, serviceName); client.setData().forPath(nodePath, inputConfig.getBytes()); LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName); } @@ -182,7 +181,7 @@ public class LogSearchConfigZK implements LogSearchConfig { String nodeData = new String(event.getData().getData()); Type eventType = event.getType(); - String configPathStab = String.format("%s/%s/", root, clusterName); + String configPathStab = String.format("/%s/", clusterName); if (event.getData().getPath().startsWith(configPathStab + "input/")) { handleInputConfigChange(eventType, nodeName, nodeData); @@ -267,7 +266,7 @@ public class LogSearchConfigZK implements LogSearchConfig { } private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) { - String globalConfigNodePath = String.format("%s/%s/global", root, clusterName); + String globalConfigNodePath = String.format("/%s/global", clusterName); String data = InputConfigGson.gson.toJson(globalConfigNode); try { @@ -283,14 +282,14 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public List<String> getServices(String clusterName) { - String parentPath = String.format("%s/%s/input", root, clusterName); + String parentPath = String.format("/%s/input", clusterName); Map<String, ChildData> serviceNodes = cache.getCurrentChildren(parentPath); return new ArrayList<String>(serviceNodes.keySet()); } @Override public String getGlobalConfigs(String clusterName) { - String globalConfigNodePath = String.format("%s/%s/global", root, clusterName); + String globalConfigNodePath = String.format("/%s/global", clusterName); return new String(cache.getCurrentData(globalConfigNodePath).getData()); } @@ -300,13 +299,13 @@ public class LogSearchConfigZK implements LogSearchConfig { JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData); InputAdapter.setGlobalConfigs(globalConfigs); - ChildData childData = cache.getCurrentData(String.format("%s/%s/input/%s", root, clusterName, serviceName)); + ChildData childData = cache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName)); return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class); } @Override public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception { - String nodePath = String.format("%s/%s/loglevelfilter/%s", root, clusterName, logId); + String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, logId); String logLevelFilterJson = gson.toJson(filter); try { client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, logLevelFilterJson.getBytes()); @@ -319,7 +318,7 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception { for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) { - String nodePath = String.format("%s/%s/loglevelfilter/%s", root, clusterName, e.getKey()); + String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, e.getKey()); String logLevelFilterJson = gson.toJson(e.getValue()); String currentLogLevelFilterJson = new String(cache.getCurrentData(nodePath).getData()); if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) { @@ -331,7 +330,7 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public LogLevelFilterMap getLogLevelFilters(String clusterName) { - String parentPath = String.format("%s/%s/loglevelfilter", root, clusterName); + String parentPath = String.format("/%s/loglevelfilter", clusterName); Map<String, ChildData> logLevelFilterNodes = cache.getCurrentChildren(parentPath); TreeMap<String, LogLevelFilter> filters = new TreeMap<>(); for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {