AMBARI-21623 Log Search Config should be separated into a Server and Log Feeder interface (mgergely)
Change-Id: Ie40cf3b57470c08375124d547dade2c9a3204e9f Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2d1ac668 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2d1ac668 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2d1ac668 Branch: refs/heads/feature-branch-AMBARI-21307 Commit: 2d1ac668b6ae272481668d08f4b7bd1c1b99fc45 Parents: 27386c3 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Fri Aug 4 11:27:57 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Fri Aug 4 11:27:57 2017 +0200 ---------------------------------------------------------------------- .../logsearch/config/api/LogSearchConfig.java | 131 --------- .../config/api/LogSearchConfigFactory.java | 61 +++- .../config/api/LogSearchConfigLogFeeder.java | 77 +++++ .../config/api/LogSearchConfigServer.java | 111 +++++++ .../config/api/LogSearchConfigClass1.java | 95 ------ .../config/api/LogSearchConfigClass2.java | 95 ------ .../config/api/LogSearchConfigFactoryTest.java | 51 +++- .../api/LogSearchConfigLogFeederClass1.java | 58 ++++ .../api/LogSearchConfigLogFeederClass2.java | 58 ++++ .../config/api/LogSearchConfigServerClass1.java | 76 +++++ .../config/api/LogSearchConfigServerClass2.java | 76 +++++ .../zookeeper/LogSearchConfigLogFeederZK.java | 228 +++++++++++++++ .../zookeeper/LogSearchConfigServerZK.java | 138 +++++++++ .../config/zookeeper/LogSearchConfigZK.java | 291 +------------------ .../org/apache/ambari/logfeeder/LogFeeder.java | 11 +- .../ambari/logfeeder/common/ConfigHandler.java | 6 +- .../logfeeder/input/InputConfigUploader.java | 10 +- .../apache/ambari/logfeeder/output/Output.java | 6 +- .../ambari/logfeeder/output/OutputSolrTest.java | 6 +- .../configurer/LogSearchConfigConfigurer.java | 13 +- .../configurer/SolrCollectionConfigurer.java | 3 +- .../ambari/logsearch/dao/SolrDaoBase.java | 11 +- .../handler/CreateCollectionHandler.java | 7 +- .../logsearch/manager/ShipperConfigManager.java | 18 +- .../logsearch/model/common/LSServerFilter.java | 10 +- 25 files changed, 970 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java index 76be392..6c3b910 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java @@ -20,80 +20,14 @@ package org.apache.ambari.logsearch.config.api; import java.io.Closeable; -import java.util.List; -import java.util.Map; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; -import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; -import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; -import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; /** * Log Search Configuration, which uploads, retrieves configurations, and monitors it's changes. */ public interface LogSearchConfig extends Closeable { /** - * Enumeration of the components of the Log Search service. - */ - public enum Component { - SERVER, LOGFEEDER; - } - - /** - * Initialization of the configuration. - * - * @param component The component which will use the configuration. - * @param properties The properties of that component. - * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode). - * @throws Exception - */ - void init(Component component, Map<String, String> properties, String clusterName) throws Exception; - - /** - * Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode. - * - * @param clusterName The name of the cluster which's services are required. - * @return List of the service names. - */ - List<String> getServices(String clusterName); - - /** - * Checks if input configuration exists. Will be used only in LOGFEEDER mode. - * - * @param serviceName The name of the service looked for. - * @return If input configuration exists for the service. - * @throws Exception - */ - boolean inputConfigExistsLogFeeder(String serviceName) throws Exception; - - /** - * Checks if input configuration exists. Will be used only in SERVER mode. - * - * @param clusterName The name of the cluster where the service is looked for. - * @param serviceName The name of the service looked for. - * @return If input configuration exists for the service. - * @throws Exception - */ - boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception; - - /** - * Returns the global configurations of a cluster. Will be used only in SERVER mode. - * - * @param clusterName The name of the cluster where the service is looked for. - * @return The global configurations of the cluster if it exists, null otherwise. - */ - String getGlobalConfigs(String clusterName); - - /** - * Returns the input configuration of a service in a cluster. Will be used only in SERVER mode. - * - * @param clusterName The name of the cluster where the service is looked for. - * @param serviceName The name of the service looked for. - * @return The input configuration for the service if it exists, null otherwise. - */ - InputConfig getInputConfig(String clusterName, String serviceName); - - /** * Uploads the input configuration for a service in a cluster. * * @param clusterName The name of the cluster where the service is. @@ -104,16 +38,6 @@ public interface LogSearchConfig extends Closeable { void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; /** - * Modifies the input configuration for a service in a cluster. - * - * @param clusterName The name of the cluster where the service is. - * @param serviceName The name of the service of which's input configuration is uploaded. - * @param inputConfig The input configuration of the service. - * @throws Exception - */ - void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; - - /** * Uploads the log level filter of a log. * * @param clusterName The name of the cluster where the log is. @@ -122,59 +46,4 @@ public interface LogSearchConfig extends Closeable { * @throws Exception */ void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception; - - /** - * Modifies the log level filters for all the logs. - * - * @param clusterName The name of the cluster where the logs are. - * @param filters The log level filters to set. - * @throws Exception - */ - void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception; - - /** - * Returns the Log Level Filters of a cluster. - * - * @param clusterName The name of the cluster which's log level filters are required. - * @return All the log level filters of the cluster. - */ - LogLevelFilterMap getLogLevelFilters(String clusterName); - - /** - * Starts the monitoring of the input configurations, asynchronously. Will be used only in LOGFEEDER mode. - * - * @param inputConfigMonitor The input config monitor to call in case of an input config change. - * @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change. - * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode). - * @throws Exception - */ - void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, - String clusterName) throws Exception; - - /** - * Saves the properties of an Output Solr. Will be used only in SERVER mode. - * - * @param type The type of the Output Solr. - * @param outputSolrProperties The properties of the Output Solr. - * @throws Exception - */ - void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception; - - /** - * Get the properties of an Output Solr. Will be used only in LOGFEEDER mode. - * - * @param type The type of the Output Solr. - * @return The properties of the Output Solr, or null if it doesn't exist. - * @throws Exception - */ - OutputSolrProperties getOutputSolrProperties(String type) throws Exception; - - /** - * Saves the properties of an Output Solr. Will be used only in LOGFEEDER mode. - * - * @param type The type of the Output Solr. - * @param outputConfigMonitors The monitors which want to watch the output config changes. - * @throws Exception - */ - void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception; } http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java index 77b48eb..a84a97b 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java @@ -21,46 +21,81 @@ package org.apache.ambari.logsearch.config.api; import java.util.Map; -import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Factory class for LogSearchConfig. + * Factory class for LogSearchConfigServer and LogSearchConfigLogFeeder. */ public class LogSearchConfigFactory { private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigFactory.class); /** - * Creates a Log Search Configuration instance that implements {@link org.apache.ambari.logsearch.config.api.LogSearchConfig}. + * Creates a Log Search Configuration instance for the Log Search Server that implements + * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigServer}. * - * @param component The component of the Log Search Service to create the configuration for (SERVER/LOGFEEDER). * @param properties The properties of the component for which the configuration is created. If the properties contain the * "logsearch.config.class" entry than the class defined there would be used instead of the default class. - * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode). * @param defaultClass The default configuration class to use if not specified otherwise. * @return The Log Search Configuration instance. - * @throws Exception Throws exception if the defined class does not implement LogSearchConfig, or doesn't have an empty + * @throws Exception Throws exception if the defined class does not implement LogSearchConfigServer, or doesn't have an empty * constructor, or throws an exception in it's init method. */ - public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties, String clusterName, - Class<? extends LogSearchConfig> defaultClass) throws Exception { + public static LogSearchConfigServer createLogSearchConfigServer(Map<String, String> properties, + Class<? extends LogSearchConfigServer> defaultClass) throws Exception { try { - LogSearchConfig logSearchConfig = null; - String configClassName = properties.get("logsearch.config.class"); + LogSearchConfigServer logSearchConfig = null; + String configClassName = properties.get("logsearch.config.server.class"); + if (configClassName != null && !"".equals(configClassName.trim())) { + Class<?> clazz = Class.forName(configClassName); + if (LogSearchConfigServer.class.isAssignableFrom(clazz)) { + logSearchConfig = (LogSearchConfigServer) clazz.newInstance(); + } else { + throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + + LogSearchConfigServer.class.getName()); + } + } else { + logSearchConfig = defaultClass.newInstance(); + } + + logSearchConfig.init(properties); + return logSearchConfig; + } catch (Exception e) { + LOG.error("Could not initialize logsearch config.", e); + throw e; + } + } + + /** + * Creates a Log Search Configuration instance for the Log Search Server that implements + * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder}. + * + * @param properties The properties of the component for which the configuration is created. If the properties contain the + * "logsearch.config.class" entry than the class defined there would be used instead of the default class. + * @param clusterName The name of the cluster. + * @param defaultClass The default configuration class to use if not specified otherwise. + * @return The Log Search Configuration instance. + * @throws Exception Throws exception if the defined class does not implement LogSearchConfigLogFeeder, or doesn't have an empty + * constructor, or throws an exception in it's init method. + */ + public static LogSearchConfigLogFeeder createLogSearchConfigLogFeeder(Map<String, String> properties, String clusterName, + Class<? extends LogSearchConfigLogFeeder> defaultClass) throws Exception { + try { + LogSearchConfigLogFeeder logSearchConfig = null; + String configClassName = properties.get("logsearch.config.logfeeder.class"); if (configClassName != null && !"".equals(configClassName.trim())) { Class<?> clazz = Class.forName(configClassName); if (LogSearchConfig.class.isAssignableFrom(clazz)) { - logSearchConfig = (LogSearchConfig) clazz.newInstance(); + logSearchConfig = (LogSearchConfigLogFeeder) clazz.newInstance(); } else { throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + - LogSearchConfig.class.getName()); + LogSearchConfigLogFeeder.class.getName()); } } else { logSearchConfig = defaultClass.newInstance(); } - logSearchConfig.init(component, properties, clusterName); + logSearchConfig.init(properties, clusterName); return logSearchConfig; } catch (Exception e) { LOG.error("Could not initialize logsearch config.", e); http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java new file mode 100644 index 0000000..6ed36fd --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; + +/** + * Log Search Configuration for Log Feeder. + */ +public interface LogSearchConfigLogFeeder extends LogSearchConfig { + /** + * Initialization of the configuration. + * + * @param properties The properties of that component. + * @param clusterName The name of the cluster. + * @throws Exception + */ + void init(Map<String, String> properties, String clusterName) throws Exception; + + /** + * Checks if input configuration exists. + * + * @param serviceName The name of the service looked for. + * @return If input configuration exists for the service. + * @throws Exception + */ + boolean inputConfigExists(String serviceName) throws Exception; + + /** + * Starts the monitoring of the input configurations, asynchronously. + * + * @param inputConfigMonitor The input config monitor to call in case of an input config change. + * @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change. + * @param clusterName The name of the cluster. + * @throws Exception + */ + void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, + String clusterName) throws Exception; + + /** + * Get the properties of an Output Solr. + * + * @param type The type of the Output Solr. + * @return The properties of the Output Solr, or null if it doesn't exist. + * @throws Exception + */ + OutputSolrProperties getOutputSolrProperties(String type) throws Exception; + + /** + * Saves the properties of an Output Solr. + * + * @param type The type of the Output Solr. + * @param outputConfigMonitors The monitors which want to watch the output config changes. + * @throws Exception + */ + void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java new file mode 100644 index 0000000..26889be --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; + +/** + * Log Search Configuration for Log Search Server. + */ +public interface LogSearchConfigServer extends LogSearchConfig { + /** + * Initialization of the configuration. + * + * @param properties The properties of that component. + * @throws Exception + */ + void init(Map<String, String> properties) throws Exception; + + /** + * Returns all the service names with input configurations of a cluster. + * + * @param clusterName The name of the cluster which's services are required. + * @return List of the service names. + */ + List<String> getServices(String clusterName); + + /** + * Checks if input configuration exists. + * + * @param clusterName The name of the cluster where the service is looked for. + * @param serviceName The name of the service looked for. + * @return If input configuration exists for the service. + * @throws Exception + */ + boolean inputConfigExists(String clusterName, String serviceName) throws Exception; + + /** + * Returns the global configurations of a cluster. + * + * @param clusterName The name of the cluster where the service is looked for. + * @return The global configurations of the cluster if it exists, null otherwise. + */ + String getGlobalConfigs(String clusterName); + + /** + * Modifies the input configuration for a service in a cluster. + * + * @param clusterName The name of the cluster where the service is. + * @param serviceName The name of the service of which's input configuration is uploaded. + * @param inputConfig The input configuration of the service. + * @throws Exception + */ + void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; + + /** + * Returns the input configuration of a service in a cluster. + * + * @param clusterName The name of the cluster where the service is looked for. + * @param serviceName The name of the service looked for. + * @return The input configuration for the service if it exists, null otherwise. + */ + InputConfig getInputConfig(String clusterName, String serviceName); + + /** + * Modifies the log level filters for all the logs. + * + * @param clusterName The name of the cluster where the logs are. + * @param filters The log level filters to set. + * @throws Exception + */ + void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception; + + /** + * Returns the Log Level Filters of a cluster. + * + * @param clusterName The name of the cluster which's log level filters are required. + * @return All the log level filters of the cluster. + */ + LogLevelFilterMap getLogLevelFilters(String clusterName); + + /** + * Saves the properties of an Output Solr. + * + * @param type The type of the Output Solr. + * @param outputSolrProperties The properties of the Output Solr. + * @throws Exception + */ + void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java deleted file mode 100644 index e308346..0000000 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logsearch.config.api; - -import java.util.List; -import java.util.Map; - -import org.apache.ambari.logsearch.config.api.InputConfigMonitor; -import org.apache.ambari.logsearch.config.api.LogSearchConfig; -import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; -import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; -import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; -import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; - -public class LogSearchConfigClass1 implements LogSearchConfig { - @Override - public void init(Component component, Map<String, String> properties, String clusterName) {} - - @Override - public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception { - return false; - } - - @Override - public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception { - return false; - } - - @Override - public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} - - @Override - public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} - - @Override - public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, - String clusterName) throws Exception {} - - @Override - public List<String> getServices(String clusterName) { - return null; - } - - @Override - public String getGlobalConfigs(String clusterName) { - return null; - } - - @Override - public InputConfig getInputConfig(String clusterName, String serviceName) { - return null; - } - - @Override - public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} - - @Override - public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {} - - @Override - public LogLevelFilterMap getLogLevelFilters(String clusterName) { - return null; - } - - @Override - public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {} - - @Override - public OutputSolrProperties getOutputSolrProperties(String type) { - return null; - } - - @Override - public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {} - - @Override - public void close() {} -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java deleted file mode 100644 index b64dae8..0000000 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logsearch.config.api; - -import java.util.List; -import java.util.Map; - -import org.apache.ambari.logsearch.config.api.InputConfigMonitor; -import org.apache.ambari.logsearch.config.api.LogSearchConfig; -import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; -import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; -import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; -import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; - -public class LogSearchConfigClass2 implements LogSearchConfig { - @Override - public void init(Component component, Map<String, String> properties, String clusterName) {} - - @Override - public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception { - return false; - } - - @Override - public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception { - return false; - } - - @Override - public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} - - @Override - public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} - - @Override - public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, - String clusterName) throws Exception {} - - @Override - public List<String> getServices(String clusterName) { - return null; - } - - @Override - public String getGlobalConfigs(String clusterName) { - return null; - } - - @Override - public InputConfig getInputConfig(String clusterName, String serviceName) { - return null; - } - - @Override - public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} - - @Override - public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {} - - @Override - public LogLevelFilterMap getLogLevelFilters(String clusterName) { - return null; - } - - @Override - public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {} - - @Override - public OutputSolrProperties getOutputSolrProperties(String type) { - return null; - } - - @Override - public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {} - - @Override - public void close() {} -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java index f990c5c..d0db87f 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component; import org.junit.Test; import junit.framework.Assert; @@ -31,28 +30,52 @@ import junit.framework.Assert; public class LogSearchConfigFactoryTest { @Test - public void testDefaultConfig() throws Exception { - LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER, - Collections.<String, String> emptyMap(), null, LogSearchConfigClass1.class); + public void testDefaultConfigServer() throws Exception { + LogSearchConfigServer config = LogSearchConfigFactory.createLogSearchConfigServer( Collections.<String, String> emptyMap(), + LogSearchConfigServerClass1.class); - Assert.assertSame(config.getClass(), LogSearchConfigClass1.class); + Assert.assertSame(config.getClass(), LogSearchConfigServerClass1.class); } @Test - public void testCustomConfig() throws Exception { + public void testCustomConfigServer() throws Exception { Map<String, String> logsearchConfClassMap = new HashMap<>(); - logsearchConfClassMap.put("logsearch.config.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigClass2"); - LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER, - logsearchConfClassMap, null, LogSearchConfigClass1.class); + logsearchConfClassMap.put("logsearch.config.server.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigServerClass2"); + LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfigServer(logsearchConfClassMap, + LogSearchConfigServerClass1.class); - Assert.assertSame(config.getClass(), LogSearchConfigClass2.class); + Assert.assertSame(config.getClass(), LogSearchConfigServerClass2.class); } @Test(expected = IllegalArgumentException.class) - public void testNonConfigClass() throws Exception { + public void testNonConfigClassServer() throws Exception { Map<String, String> logsearchConfClassMap = new HashMap<>(); - logsearchConfClassMap.put("logsearch.config.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass"); - LogSearchConfigFactory.createLogSearchConfig(Component.SERVER, - logsearchConfClassMap, null, LogSearchConfigClass1.class); + logsearchConfClassMap.put("logsearch.config.server.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass"); + LogSearchConfigFactory.createLogSearchConfigServer(logsearchConfClassMap, LogSearchConfigServerClass1.class); + } + + @Test + public void testDefaultConfigLogFeeder() throws Exception { + LogSearchConfigLogFeeder config = LogSearchConfigFactory.createLogSearchConfigLogFeeder( Collections.<String, String> emptyMap(), + null, LogSearchConfigLogFeederClass1.class); + + Assert.assertSame(config.getClass(), LogSearchConfigLogFeederClass1.class); + } + + @Test + public void testCustomConfigLogFeeder() throws Exception { + Map<String, String> logsearchConfClassMap = new HashMap<>(); + logsearchConfClassMap.put("logsearch.config.logfeeder.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeederClass2"); + LogSearchConfigLogFeeder config = LogSearchConfigFactory.createLogSearchConfigLogFeeder(logsearchConfClassMap, null, + LogSearchConfigLogFeederClass1.class); + + Assert.assertSame(config.getClass(), LogSearchConfigLogFeederClass2.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testNonConfigClassLogFeeder() throws Exception { + Map<String, String> logsearchConfClassMap = new HashMap<>(); + logsearchConfClassMap.put("logsearch.config.logfeeder.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass"); + LogSearchConfigFactory.createLogSearchConfigLogFeeder(logsearchConfClassMap, null, LogSearchConfigLogFeederClass1.class); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java new file mode 100644 index 0000000..b7da7fe --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; + +public class LogSearchConfigLogFeederClass1 implements LogSearchConfigLogFeeder { + @Override + public void init(Map<String, String> properties, String clusterName) {} + + @Override + public boolean inputConfigExists(String serviceName) throws Exception { + return false; + } + + @Override + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override + public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, + String clusterName) throws Exception {} + + @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} + + @Override + public OutputSolrProperties getOutputSolrProperties(String type) { + return null; + } + + @Override + public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {} + + @Override + public void close() {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java new file mode 100644 index 0000000..b703bf8 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; + +public class LogSearchConfigLogFeederClass2 implements LogSearchConfigLogFeeder { + @Override + public void init(Map<String, String> properties, String clusterName) {} + + @Override + public boolean inputConfigExists(String serviceName) throws Exception { + return false; + } + + @Override + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override + public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, + String clusterName) throws Exception {} + + @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} + + @Override + public OutputSolrProperties getOutputSolrProperties(String type) { + return null; + } + + @Override + public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {} + + @Override + public void close() {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java new file mode 100644 index 0000000..71e924a --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; + +public class LogSearchConfigServerClass1 implements LogSearchConfigServer { + @Override + public void init(Map<String, String> properties) {} + + @Override + public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { + return false; + } + + @Override + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override + public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override + public List<String> getServices(String clusterName) { + return null; + } + + @Override + public String getGlobalConfigs(String clusterName) { + return null; + } + + @Override + public InputConfig getInputConfig(String clusterName, String serviceName) { + return null; + } + + @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} + + @Override + public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {} + + @Override + public LogLevelFilterMap getLogLevelFilters(String clusterName) { + return null; + } + + @Override + public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {} + + @Override + public void close() {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java new file mode 100644 index 0000000..a767ff5 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; + +public class LogSearchConfigServerClass2 implements LogSearchConfigServer { + @Override + public void init(Map<String, String> properties) {} + + @Override + public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { + return false; + } + + @Override + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override + public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override + public List<String> getServices(String clusterName) { + return null; + } + + @Override + public String getGlobalConfigs(String clusterName) { + return null; + } + + @Override + public InputConfig getInputConfig(String clusterName, String serviceName) { + return null; + } + + @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} + + @Override + public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {} + + @Override + public LogLevelFilterMap getLogLevelFilters(String clusterName) { + return null; + } + + @Override + public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {} + + @Override + public void close() {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java new file mode 100644 index 0000000..c050540 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; +import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl; +import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl; +import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableSet; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements LogSearchConfigLogFeeder { + private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigLogFeederZK.class); + + private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10; + + private TreeCache logFeederClusterCache; + + @Override + public void init(Map<String, String> properties, String clusterName) throws Exception { + super.init(properties); + while (client.checkExists().forPath("/") == null) { + LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds"); + Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000); + } + + logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName)); + } + + @Override + public boolean inputConfigExists(String serviceName) throws Exception { + String nodePath = String.format("/input/%s", serviceName); + return logFeederClusterCache.getCurrentData(nodePath) != null; + } + + @Override + public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor, + final LogLevelFilterMonitor logLevelFilterMonitor, final String clusterName) throws Exception { + final JsonParser parser = new JsonParser(); + final JsonArray globalConfigNode = new JsonArray(); + for (String globalConfigJsonString : inputConfigMonitor.getGlobalConfigJsons()) { + JsonElement globalConfigJson = parser.parse(globalConfigJsonString); + globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global")); + } + + createGlobalConfigNode(globalConfigNode, clusterName); + + TreeCacheListener listener = new TreeCacheListener() { + private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED); + + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + if (!nodeEvents.contains(event.getType())) { + return; + } + + String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath()); + String nodeData = new String(event.getData().getData()); + Type eventType = event.getType(); + + String configPathStab = String.format("/%s/", clusterName); + + if (event.getData().getPath().startsWith(configPathStab + "input/")) { + handleInputConfigChange(eventType, nodeName, nodeData); + } else if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) { + handleLogLevelFilterChange(eventType, nodeName, nodeData); + } + } + + private void handleInputConfigChange(Type eventType, String nodeName, String nodeData) { + switch (eventType) { + case NODE_ADDED: + LOG.info("Node added under input ZK node: " + nodeName); + addInputs(nodeName, nodeData); + break; + case NODE_UPDATED: + LOG.info("Node updated under input ZK node: " + nodeName); + removeInputs(nodeName); + addInputs(nodeName, nodeData); + break; + case NODE_REMOVED: + LOG.info("Node removed from input ZK node: " + nodeName); + removeInputs(nodeName); + break; + default: + break; + } + } + + private void removeInputs(String serviceName) { + inputConfigMonitor.removeInputs(serviceName); + } + + private void addInputs(String serviceName, String inputConfig) { + try { + JsonElement inputConfigJson = parser.parse(inputConfig); + for (Map.Entry<String, JsonElement> typeEntry : inputConfigJson.getAsJsonObject().entrySet()) { + for (JsonElement e : typeEntry.getValue().getAsJsonArray()) { + for (JsonElement globalConfig : globalConfigNode) { + merge(globalConfig.getAsJsonObject(), e.getAsJsonObject()); + } + } + } + + inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class)); + } catch (Exception e) { + LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e); + } + } + + private void handleLogLevelFilterChange(Type eventType, String nodeName, String nodeData) { + switch (eventType) { + case NODE_ADDED: + case NODE_UPDATED: + LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName); + LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class); + logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter); + break; + case NODE_REMOVED: + LOG.info("Node removed loglevelfilter input ZK node: " + nodeName); + logLevelFilterMonitor.removeLogLevelFilter(nodeName); + break; + default: + break; + } + } + + private void merge(JsonObject source, JsonObject target) { + for (Map.Entry<String, JsonElement> e : source.entrySet()) { + if (!target.has(e.getKey())) { + target.add(e.getKey(), e.getValue()); + } else { + if (e.getValue().isJsonObject()) { + JsonObject valueJson = (JsonObject)e.getValue(); + merge(valueJson, target.get(e.getKey()).getAsJsonObject()); + } + } + } + } + }; + logFeederClusterCache.getListenable().addListener(listener); + logFeederClusterCache.start(); + } + + private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) { + String globalConfigNodePath = String.format("/%s/global", clusterName); + String data = InputConfigGson.gson.toJson(globalConfigNode); + + try { + if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) { + client.setData().forPath(globalConfigNodePath, data.getBytes()); + } else { + client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes()); + } + } catch (Exception e) { + LOG.warn("Exception during global config node creation/update", e); + } + } + + @Override + public OutputSolrProperties getOutputSolrProperties(String type) throws Exception { + String nodePath = String.format("/output/solr/%s", type); + ChildData currentData = outputCache.getCurrentData(nodePath); + return currentData == null ? + null : + gson.fromJson(new String(currentData.getData()), OutputSolrPropertiesImpl.class); + } + + @Override + public void monitorOutputProperties(final List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception { + TreeCacheListener listener = new TreeCacheListener() { + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + if (event.getType() != Type.NODE_UPDATED) { + return; + } + + LOG.info("Output config updated: " + event.getData().getPath()); + for (OutputConfigMonitor monitor : outputConfigMonitors) { + String monitorPath = String.format("/output/%s/%s", monitor.getDestination(), monitor.getOutputType()); + if (monitorPath.equals(event.getData().getPath())) { + String nodeData = new String(event.getData().getData()); + OutputSolrProperties outputSolrProperties = gson.fromJson(nodeData, OutputSolrPropertiesImpl.class); + monitor.outputConfigChanged(outputSolrProperties); + } + } + } + }; + outputCache.getListenable().addListener(listener); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java new file mode 100644 index 0000000..9973196 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.ambari.logsearch.config.api.LogSearchConfigServer; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; + +public class LogSearchConfigServerZK extends LogSearchConfigZK implements LogSearchConfigServer { + private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigServerZK.class); + + private TreeCache serverCache; + + @Override + public void init(Map<String, String> properties) throws Exception { + super.init(properties); + + if (client.checkExists().forPath("/") == null) { + client.create().creatingParentContainersIfNeeded().forPath("/"); + } + if (client.checkExists().forPath("/output") == null) { + client.create().creatingParentContainersIfNeeded().forPath("/output"); + } + serverCache = new TreeCache(client, "/"); + serverCache.start(); + } + + @Override + public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { + String nodePath = String.format("/%s/input/%s", clusterName, serviceName); + return serverCache.getCurrentData(nodePath) != null; + } + + @Override + public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { + String nodePath = String.format("/%s/input/%s", clusterName, serviceName); + client.setData().forPath(nodePath, inputConfig.getBytes()); + LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName); + } + + @Override + public List<String> getServices(String clusterName) { + String parentPath = String.format("/%s/input", clusterName); + Map<String, ChildData> serviceNodes = serverCache.getCurrentChildren(parentPath); + return serviceNodes == null ? + new ArrayList<>() : + new ArrayList<>(serviceNodes.keySet()); + } + + @Override + public String getGlobalConfigs(String clusterName) { + String globalConfigNodePath = String.format("/%s/global", clusterName); + return new String(serverCache.getCurrentData(globalConfigNodePath).getData()); + } + + @Override + public InputConfig getInputConfig(String clusterName, String serviceName) { + String globalConfigData = getGlobalConfigs(clusterName); + JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData); + InputAdapter.setGlobalConfigs(globalConfigs); + + ChildData childData = serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName)); + return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class); + } + + @Override + public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception { + for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) { + String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, e.getKey()); + String logLevelFilterJson = gson.toJson(e.getValue()); + String currentLogLevelFilterJson = new String(serverCache.getCurrentData(nodePath).getData()); + if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) { + client.setData().forPath(nodePath, logLevelFilterJson.getBytes()); + LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName); + } + } + } + + @Override + public LogLevelFilterMap getLogLevelFilters(String clusterName) { + String parentPath = String.format("/%s/loglevelfilter", clusterName); + Map<String, ChildData> logLevelFilterNodes = serverCache.getCurrentChildren(parentPath); + TreeMap<String, LogLevelFilter> filters = new TreeMap<>(); + for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) { + LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class); + filters.put(e.getKey(), logLevelFilter); + } + + LogLevelFilterMap logLevelFilters = new LogLevelFilterMap(); + logLevelFilters.setFilter(filters); + return logLevelFilters; + } + + @Override + public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception { + String nodePath = String.format("/output/solr/%s", type); + String data = gson.toJson(outputSolrProperties); + if (outputCache.getCurrentData(nodePath) == null) { + client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, data.getBytes()); + } else { + client.setData().forPath(nodePath, data.getBytes()); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index 387d0c6..7037bef 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -22,33 +22,16 @@ package org.apache.ambari.logsearch.config.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeMap; import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; -import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; -import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; -import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; -import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; -import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter; -import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson; -import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl; -import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl; -import org.apache.ambari.logsearch.config.api.InputConfigMonitor; -import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -57,13 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableSet; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; public class LogSearchConfigZK implements LogSearchConfig { private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class); @@ -71,7 +49,6 @@ public class LogSearchConfigZK implements LogSearchConfig { private static final int SESSION_TIMEOUT = 15000; private static final int CONNECTION_TIMEOUT = 30000; private static final String DEFAULT_ZK_ROOT = "/logsearch"; - private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10; private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; @LogSearchPropertyDescription( @@ -99,16 +76,12 @@ public class LogSearchConfigZK implements LogSearchConfig { ) private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root"; - private Map<String, String> properties; - private CuratorFramework client; - private Gson gson; + protected Map<String, String> properties; + protected CuratorFramework client; + protected TreeCache outputCache; + protected Gson gson; - private TreeCache serverCache; - private TreeCache logFeederClusterCache; - private TreeCache outputCache; - - @Override - public void init(Component component, Map<String, String> properties, String clusterName) throws Exception { + public void init(Map<String, String> properties) throws Exception { this.properties = properties; String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT); @@ -124,39 +97,10 @@ public class LogSearchConfigZK implements LogSearchConfig { outputCache = new TreeCache(client, "/output"); outputCache.start(); - if (component == Component.SERVER) { - if (client.checkExists().forPath("/") == null) { - client.create().creatingParentContainersIfNeeded().forPath("/"); - } - if (client.checkExists().forPath("/output") == null) { - client.create().creatingParentContainersIfNeeded().forPath("/output"); - } - serverCache = new TreeCache(client, "/"); - serverCache.start(); - } else { - while (client.checkExists().forPath("/") == null) { - LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds"); - Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000); - } - logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName)); - } - gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); } @Override - public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception { - String nodePath = String.format("/input/%s", serviceName); - return logFeederClusterCache.getCurrentData(nodePath) != null; - } - - @Override - public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception { - String nodePath = String.format("/%s/input/%s", clusterName, serviceName); - return serverCache.getCurrentData(nodePath) != null; - } - - @Override public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { String nodePath = String.format("/%s/input/%s", clusterName, serviceName); try { @@ -168,159 +112,6 @@ public class LogSearchConfigZK implements LogSearchConfig { } @Override - public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { - String nodePath = String.format("/%s/input/%s", clusterName, serviceName); - client.setData().forPath(nodePath, inputConfig.getBytes()); - LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName); - } - - @Override - public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor, - final LogLevelFilterMonitor logLevelFilterMonitor, final String clusterName) throws Exception { - final JsonParser parser = new JsonParser(); - final JsonArray globalConfigNode = new JsonArray(); - for (String globalConfigJsonString : inputConfigMonitor.getGlobalConfigJsons()) { - JsonElement globalConfigJson = parser.parse(globalConfigJsonString); - globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global")); - } - - createGlobalConfigNode(globalConfigNode, clusterName); - - TreeCacheListener listener = new TreeCacheListener() { - private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED); - - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - if (!nodeEvents.contains(event.getType())) { - return; - } - - String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath()); - String nodeData = new String(event.getData().getData()); - Type eventType = event.getType(); - - String configPathStab = String.format("/%s/", clusterName); - - if (event.getData().getPath().startsWith(configPathStab + "input/")) { - handleInputConfigChange(eventType, nodeName, nodeData); - } else if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) { - handleLogLevelFilterChange(eventType, nodeName, nodeData); - } - } - - private void handleInputConfigChange(Type eventType, String nodeName, String nodeData) { - switch (eventType) { - case NODE_ADDED: - LOG.info("Node added under input ZK node: " + nodeName); - addInputs(nodeName, nodeData); - break; - case NODE_UPDATED: - LOG.info("Node updated under input ZK node: " + nodeName); - removeInputs(nodeName); - addInputs(nodeName, nodeData); - break; - case NODE_REMOVED: - LOG.info("Node removed from input ZK node: " + nodeName); - removeInputs(nodeName); - break; - default: - break; - } - } - - private void removeInputs(String serviceName) { - inputConfigMonitor.removeInputs(serviceName); - } - - private void addInputs(String serviceName, String inputConfig) { - try { - JsonElement inputConfigJson = parser.parse(inputConfig); - for (Map.Entry<String, JsonElement> typeEntry : inputConfigJson.getAsJsonObject().entrySet()) { - for (JsonElement e : typeEntry.getValue().getAsJsonArray()) { - for (JsonElement globalConfig : globalConfigNode) { - merge(globalConfig.getAsJsonObject(), e.getAsJsonObject()); - } - } - } - - inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class)); - } catch (Exception e) { - LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e); - } - } - - private void handleLogLevelFilterChange(Type eventType, String nodeName, String nodeData) { - switch (eventType) { - case NODE_ADDED: - case NODE_UPDATED: - LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName); - LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class); - logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter); - break; - case NODE_REMOVED: - LOG.info("Node removed loglevelfilter input ZK node: " + nodeName); - logLevelFilterMonitor.removeLogLevelFilter(nodeName); - break; - default: - break; - } - } - - private void merge(JsonObject source, JsonObject target) { - for (Map.Entry<String, JsonElement> e : source.entrySet()) { - if (!target.has(e.getKey())) { - target.add(e.getKey(), e.getValue()); - } else { - if (e.getValue().isJsonObject()) { - JsonObject valueJson = (JsonObject)e.getValue(); - merge(valueJson, target.get(e.getKey()).getAsJsonObject()); - } - } - } - } - }; - logFeederClusterCache.getListenable().addListener(listener); - logFeederClusterCache.start(); - } - - private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) { - String globalConfigNodePath = String.format("/%s/global", clusterName); - String data = InputConfigGson.gson.toJson(globalConfigNode); - - try { - if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) { - client.setData().forPath(globalConfigNodePath, data.getBytes()); - } else { - client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes()); - } - } catch (Exception e) { - LOG.warn("Exception during global config node creation/update", e); - } - } - - @Override - public List<String> getServices(String clusterName) { - String parentPath = String.format("/%s/input", clusterName); - Map<String, ChildData> serviceNodes = serverCache.getCurrentChildren(parentPath); - return new ArrayList<String>(serviceNodes.keySet()); - } - - @Override - public String getGlobalConfigs(String clusterName) { - String globalConfigNodePath = String.format("/%s/global", clusterName); - return new String(serverCache.getCurrentData(globalConfigNodePath).getData()); - } - - @Override - public InputConfig getInputConfig(String clusterName, String serviceName) { - String globalConfigData = getGlobalConfigs(clusterName); - JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData); - InputAdapter.setGlobalConfigs(globalConfigs); - - ChildData childData = serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName)); - return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class); - } - - @Override public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception { String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, logId); String logLevelFilterJson = gson.toJson(filter); @@ -332,35 +123,7 @@ public class LogSearchConfigZK implements LogSearchConfig { } } - @Override - public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception { - for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) { - String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, e.getKey()); - String logLevelFilterJson = gson.toJson(e.getValue()); - String currentLogLevelFilterJson = new String(serverCache.getCurrentData(nodePath).getData()); - if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) { - client.setData().forPath(nodePath, logLevelFilterJson.getBytes()); - LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName); - } - } - } - - @Override - public LogLevelFilterMap getLogLevelFilters(String clusterName) { - String parentPath = String.format("/%s/loglevelfilter", clusterName); - Map<String, ChildData> logLevelFilterNodes = serverCache.getCurrentChildren(parentPath); - TreeMap<String, LogLevelFilter> filters = new TreeMap<>(); - for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) { - LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class); - filters.put(e.getKey(), logLevelFilter); - } - - LogLevelFilterMap logLevelFilters = new LogLevelFilterMap(); - logLevelFilters.setFilter(filters); - return logLevelFilters; - } - - private List<ACL> getAcls() { + protected List<ACL> getAcls() { String aclStr = properties.get(ZK_ACLS_PROPERTY); if (StringUtils.isBlank(aclStr)) { return ZooDefs.Ids.OPEN_ACL_UNSAFE; @@ -404,48 +167,6 @@ public class LogSearchConfigZK implements LogSearchConfig { } @Override - public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception { - String nodePath = String.format("/output/solr/%s", type); - String data = gson.toJson(outputSolrProperties); - if (outputCache.getCurrentData(nodePath) == null) { - client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, data.getBytes()); - } else { - client.setData().forPath(nodePath, data.getBytes()); - } - } - - @Override - public OutputSolrProperties getOutputSolrProperties(String type) throws Exception { - String nodePath = String.format("/output/solr/%s", type); - ChildData currentData = outputCache.getCurrentData(nodePath); - return currentData == null ? - null : - gson.fromJson(new String(currentData.getData()), OutputSolrPropertiesImpl.class); - } - - @Override - public void monitorOutputProperties(final List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception { - TreeCacheListener listener = new TreeCacheListener() { - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - if (event.getType() != Type.NODE_UPDATED) { - return; - } - - LOG.info("Output config updated: " + event.getData().getPath()); - for (OutputConfigMonitor monitor : outputConfigMonitors) { - String monitorPath = String.format("/output/%s/%s", monitor.getDestination(), monitor.getOutputType()); - if (monitorPath.equals(event.getData().getPath())) { - String nodeData = new String(event.getData().getData()); - OutputSolrProperties outputSolrProperties = gson.fromJson(nodeData, OutputSolrPropertiesImpl.class); - monitor.outputConfigChanged(outputSolrProperties); - } - } - } - }; - outputCache.getListenable().addListener(listener); - } - - @Override public void close() { LOG.info("Closing ZooKeeper Connection"); client.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 2461819..5114743 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -27,10 +27,9 @@ import java.util.Map; import org.apache.ambari.logfeeder.common.ConfigHandler; import org.apache.ambari.logfeeder.common.LogEntryParseTester; -import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory; -import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component; -import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK; +import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; +import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK; import org.apache.commons.io.FileUtils; import org.apache.ambari.logfeeder.input.InputConfigUploader; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; @@ -54,7 +53,7 @@ public class LogFeeder { private final LogFeederCommandLine cli; private ConfigHandler configHandler; - private LogSearchConfig config; + private LogSearchConfigLogFeeder config; private MetricsManager metricsManager = new MetricsManager(); @@ -80,8 +79,8 @@ public class LogFeeder { SSLUtil.ensureStorePasswords(); - config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,Maps.fromProperties(LogFeederPropertiesUtil.getProperties()), - LogFeederPropertiesUtil.getClusterName(), LogSearchConfigZK.class); + config = LogSearchConfigFactory.createLogSearchConfigLogFeeder(Maps.fromProperties(LogFeederPropertiesUtil.getProperties()), + LogFeederPropertiesUtil.getClusterName(), LogSearchConfigLogFeederZK.class); configHandler = new ConfigHandler(config); configHandler.init(); LogLevelFilterHandler.init(config); http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 11df9dc..243b344 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -49,7 +49,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; -import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; @@ -65,7 +65,7 @@ import com.google.gson.reflect.TypeToken; public class ConfigHandler implements InputConfigMonitor { private static final Logger LOG = Logger.getLogger(ConfigHandler.class); - private final LogSearchConfig logSearchConfig; + private final LogSearchConfigLogFeeder logSearchConfig; private final OutputManager outputManager = new OutputManager(); private final InputManager inputManager = new InputManager(); @@ -79,7 +79,7 @@ public class ConfigHandler implements InputConfigMonitor { private boolean simulateMode = false; - public ConfigHandler(LogSearchConfig logSearchConfig) { + public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) { this.logSearchConfig = logSearchConfig; }