AMBARI-20881 Add Log Level Filter to the Log Search config API (mgergely) Change-Id: I8e3d5a628d02407ad2af4ecb77fff3ada10f7707
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b94d3cf Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b94d3cf Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b94d3cf Branch: refs/heads/trunk Commit: 3b94d3cff380bf3557af57c6ecd9005ab46b8916 Parents: f18a822 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Wed May 17 14:47:06 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Wed May 17 14:47:40 2017 +0200 ---------------------------------------------------------------------- .../ambari-logsearch-config-api/pom.xml | 2 +- .../config/api/InputConfigMonitor.java | 4 +- .../config/api/LogLevelFilterMonitor.java | 44 ++++ .../logsearch/config/api/LogSearchConfig.java | 57 ++++- .../model/loglevelfilter/LogLevelFilter.java | 79 +++++++ .../model/loglevelfilter/LogLevelFilterMap.java | 33 +++ .../config/api/LogSearchConfigClass1.java | 21 +- .../config/api/LogSearchConfigClass2.java | 21 +- .../ambari-logsearch-config-zookeeper/pom.xml | 4 + .../config/zookeeper/LogSearchConfigZK.java | 191 ++++++++++++----- .../org/apache/ambari/logfeeder/LogFeeder.java | 6 +- .../logfeeder/input/InputConfigUploader.java | 2 +- .../logfeeder/logconfig/FilterLogData.java | 87 -------- .../logfeeder/logconfig/LogConfigFetcher.java | 168 --------------- .../logfeeder/logconfig/LogConfigHandler.java | 213 ------------------- .../logfeeder/logconfig/LogFeederFilter.java | 90 -------- .../logconfig/LogFeederFilterWrapper.java | 55 ----- .../logfeeder/loglevelfilter/FilterLogData.java | 73 +++++++ .../loglevelfilter/LogLevelFilterHandler.java | 157 ++++++++++++++ .../ambari/logfeeder/output/OutputManager.java | 2 +- .../ambari/logfeeder/util/LogFeederUtil.java | 19 -- .../logconfig/LogConfigHandlerTest.java | 90 ++++---- .../src/test/resources/logfeeder.properties | 3 +- .../configurer/LogfeederFilterConfigurer.java | 66 ------ .../ambari/logsearch/dao/UserConfigSolrDao.java | 79 ------- .../ambari/logsearch/doc/DocConstants.java | 10 +- .../logsearch/manager/ShipperConfigManager.java | 45 +++- .../logsearch/manager/UserConfigManager.java | 24 --- .../model/common/LSServerLogLevelFilter.java | 100 +++++++++ .../model/common/LSServerLogLevelFilterMap.java | 65 ++++++ .../model/common/LogFeederDataMap.java | 50 ----- .../model/common/LogfeederFilterData.java | 87 -------- .../logsearch/rest/ShipperConfigResource.java | 43 +++- .../logsearch/rest/UserConfigResource.java | 18 -- .../webapp/templates/common/Header_tmpl.html | 5 +- .../server/upgrade/UpgradeCatalog300.java | 15 ++ .../configuration/logfeeder-properties.xml | 10 + .../configuration/logsearch-properties.xml | 10 - .../LOGSEARCH/0.5.0/themes/theme.json | 4 +- .../server/upgrade/UpgradeCatalog300Test.java | 29 +++ 40 files changed, 982 insertions(+), 1099 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/pom.xml b/ambari-logsearch/ambari-logsearch-config-api/pom.xml index e9abed0..72fcc80 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/pom.xml +++ b/ambari-logsearch/ambari-logsearch-config-api/pom.xml @@ -33,7 +33,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> - + <dependencies> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java index df26920..29a82a6 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java @@ -20,7 +20,7 @@ package org.apache.ambari.logsearch.config.api; /** - * Monitors input configuration changes. + * Monitors input configuration changes. */ public interface InputConfigMonitor { /** @@ -31,7 +31,7 @@ public interface InputConfigMonitor { * @throws Exception */ void loadInputConfigs(String serviceName, String inputConfig) throws Exception; - + /** * Notification of the removal of an input configuration. * http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java new file mode 100644 index 0000000..766f751 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Monitors log level filter changes. + */ +package org.apache.ambari.logsearch.config.api; + +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; + +public interface LogLevelFilterMonitor { + + /** + * Notification of a new or updated log level filter. + * + * @param logId The log for which the log level filter was created/updated. + * @param logLevelFilter The log level filter to apply from now on to the log. + */ + void setLogLevelFilter(String logId, LogLevelFilter logLevelFilter); + + /** + * Notification of the removal of a log level filter. + * + * @param logId The log of which's log level filter was removed. + */ + void removeLogLevelFilter(String logId); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java index 0bb0b78..07921d0 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java @@ -23,6 +23,9 @@ import java.io.Closeable; import java.util.List; import java.util.Map; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; + /** * Log Search Configuration, which uploads, retrieves configurations, and monitors it's changes. */ @@ -33,7 +36,7 @@ public interface LogSearchConfig extends Closeable { public enum Component { SERVER, LOGFEEDER; } - + /** * Initialization of the configuration. * @@ -42,7 +45,7 @@ public interface LogSearchConfig extends Closeable { * @throws Exception */ void init(Component component, Map<String, String> properties) throws Exception; - + /** * Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode. * @@ -50,7 +53,7 @@ public interface LogSearchConfig extends Closeable { * @return List of the service names. */ List<String> getServices(String clusterName); - + /** * Checks if input configuration exists. * @@ -60,7 +63,7 @@ public interface LogSearchConfig extends Closeable { * @throws Exception */ boolean inputConfigExists(String clusterName, String serviceName) throws Exception; - + /** * Returns the input configuration of a service in a cluster. Will be used only in SERVER mode. * @@ -69,7 +72,7 @@ public interface LogSearchConfig extends Closeable { * @return The input configuration for the service if it exists, null otherwise. */ String getInputConfig(String clusterName, String serviceName); - + /** * Uploads the input configuration for a service in a cluster. * @@ -78,13 +81,51 @@ public interface LogSearchConfig extends Closeable { * @param inputConfig The input configuration of the service. * @throws Exception */ + void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; + + /** + * Modifies the input configuration for a service in a cluster. + * + * @param clusterName The name of the cluster where the service is. + * @param serviceName The name of the service of which's input configuration is uploaded. + * @param inputConfig The input configuration of the service. + * @throws Exception + */ void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; - + + /** + * Uploads the log level filter of a log. + * + * @param clusterName The name of the cluster where the log is. + * @param logId The id of the log. + * @param filter The log level filter for the log. + * @throws Exception + */ + void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception; + + /** + * Modifies the log level filters for all the logs. + * + * @param clusterName The name of the cluster where the logs are. + * @param filters The log level filters to set. + * @throws Exception + */ + void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception; + + /** + * Returns the Log Level Filters of a cluster. + * + * @param clusterName The name of the cluster which's log level filters are required. + * @return All the log level filters of the cluster. + */ + LogLevelFilterMap getLogLevelFilters(String clusterName); + /** * Starts the monitoring of the input configurations, asynchronously. Will be used only in LOGFEEDER mode. * - * @param configMonitor The input config monitor to call in case of a config change. + * @param inputConfigMonitor The input config monitor to call in case of an input config change. + * @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change. * @throws Exception */ - void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception; + void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) throws Exception; } http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java new file mode 100644 index 0000000..06cf589 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logsearch.config.api.model.loglevelfilter; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +public class LogLevelFilter { + + private String label; + private List<String> hosts; + private List<String> defaultLevels; + private List<String> overrideLevels; + private Date expiryTime; + + public LogLevelFilter() { + hosts = new ArrayList<String>(); + defaultLevels = new ArrayList<String>(); + overrideLevels = new ArrayList<String>(); + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public List<String> getHosts() { + return hosts; + } + + public void setHosts(List<String> hosts) { + this.hosts = hosts; + } + + public List<String> getDefaultLevels() { + return defaultLevels; + } + + public void setDefaultLevels(List<String> defaultLevels) { + this.defaultLevels = defaultLevels; + } + + public List<String> getOverrideLevels() { + return overrideLevels; + } + + public void setOverrideLevels(List<String> overrideLevels) { + this.overrideLevels = overrideLevels; + } + + public Date getExpiryTime() { + return expiryTime; + } + + public void setExpiryTime(Date expiryTime) { + this.expiryTime = expiryTime; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java new file mode 100644 index 0000000..37fdb9f --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logsearch.config.api.model.loglevelfilter; + +import java.util.TreeMap; + +public class LogLevelFilterMap { + private TreeMap<String, LogLevelFilter> filter; + + public TreeMap<String, LogLevelFilter> getFilter() { + return filter; + } + + public void setFilter(TreeMap<String, LogLevelFilter> filter) { + this.filter = filter; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java index 969eb30..fc3fe5b 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java @@ -24,6 +24,8 @@ import java.util.Map; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; public class LogSearchConfigClass1 implements LogSearchConfig { @Override @@ -35,10 +37,14 @@ public class LogSearchConfigClass1 implements LogSearchConfig { } @Override + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} @Override - public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {} + public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) + throws Exception {} @Override public List<String> getServices(String clusterName) { @@ -49,7 +55,18 @@ public class LogSearchConfigClass1 implements LogSearchConfig { public String getInputConfig(String clusterName, String serviceName) { return null; } - + + @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} + + @Override + public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {} + + @Override + public LogLevelFilterMap getLogLevelFilters(String clusterName) { + return null; + } + @Override public void close() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java index 664ecc9..346edb3 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java @@ -24,6 +24,8 @@ import java.util.Map; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; public class LogSearchConfigClass2 implements LogSearchConfig { @Override @@ -35,10 +37,14 @@ public class LogSearchConfigClass2 implements LogSearchConfig { } @Override + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} + + @Override public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} @Override - public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {} + public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) + throws Exception {} @Override public List<String> getServices(String clusterName) { @@ -49,7 +55,18 @@ public class LogSearchConfigClass2 implements LogSearchConfig { public String getInputConfig(String clusterName, String serviceName) { return null; } - + + @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {} + + @Override + public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {} + + @Override + public LogLevelFilterMap getLogLevelFilters(String clusterName) { + return null; + } + @Override public void close() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml index 4ed8eba..2c59a4a 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml @@ -70,5 +70,9 @@ <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index 738fde2..5e22374 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -22,9 +22,13 @@ package org.apache.ambari.logsearch.config.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -32,15 +36,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.ZKPaths; import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import com.google.common.base.Splitter; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; public class LogSearchConfigZK implements LogSearchConfig { private static final Logger LOG = Logger.getLogger(LogSearchConfigZK.class); @@ -49,6 +57,7 @@ public class LogSearchConfigZK implements LogSearchConfig { private static final int CONNECTION_TIMEOUT = 30000; private static final String DEFAULT_ZK_ROOT = "/logsearch"; private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10; + private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; private static final String CLUSTER_NAME_PROPERTY = "cluster.name"; private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string"; @@ -59,6 +68,7 @@ public class LogSearchConfigZK implements LogSearchConfig { private String root; private CuratorFramework client; private TreeCache cache; + private Gson gson; @Override public void init(Component component, Map<String, String> properties) throws Exception { @@ -89,6 +99,8 @@ public class LogSearchConfigZK implements LogSearchConfig { cache = new TreeCache(client, String.format("%s/%s", root, properties.get(CLUSTER_NAME_PROPERTY))); } + + gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); } @Override @@ -98,66 +110,43 @@ public class LogSearchConfigZK implements LogSearchConfig { } @Override - public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { + public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName); - client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes()); - LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName); - } - - private List<ACL> getAcls() { - String aclStr = properties.get(ZK_ACLS_PROPERTY); - if (StringUtils.isBlank(aclStr)) { - return ZooDefs.Ids.OPEN_ACL_UNSAFE; + try { + client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes()); + LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName); + } catch (NodeExistsException e) { + LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder"); } - - List<ACL> acls = new ArrayList<>(); - List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr); - for (String unparcedAcl : aclStrList) { - String[] parts = unparcedAcl.split(":"); - if (parts.length == 3) { - acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1]))); - } - } - return acls; } - private Integer parsePermission(String permission) { - int permissionCode = 0; - for (char each : permission.toLowerCase().toCharArray()) { - switch (each) { - case 'r': - permissionCode |= ZooDefs.Perms.READ; - break; - case 'w': - permissionCode |= ZooDefs.Perms.WRITE; - break; - case 'c': - permissionCode |= ZooDefs.Perms.CREATE; - break; - case 'd': - permissionCode |= ZooDefs.Perms.DELETE; - break; - case 'a': - permissionCode |= ZooDefs.Perms.ADMIN; - break; - default: - throw new IllegalArgumentException("Unsupported permission: " + permission); - } - } - return permissionCode; + @Override + public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception { + String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName); + client.setData().forPath(nodePath, inputConfig.getBytes()); + LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName); } @Override - public void monitorInputConfigChanges(final InputConfigMonitor configMonitor) throws Exception { + public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor, + final LogLevelFilterMonitor logLevelFilterMonitor ) throws Exception { TreeCacheListener listener = new TreeCacheListener() { public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - if (!event.getData().getPath().startsWith(String.format("%s/%s/input/", root, properties.get(CLUSTER_NAME_PROPERTY)))) { - return; - } - String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath()); String nodeData = new String(event.getData().getData()); - switch (event.getType()) { + Type eventType = event.getType(); + + String configPathStab = String.format("%s/%s/", root, properties.get(CLUSTER_NAME_PROPERTY)); + + if (event.getData().getPath().startsWith(configPathStab + "input/")) { + handleInputConfigChange(eventType, nodeName, nodeData); + } else if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) { + handleLogLevelFilterChange(eventType, nodeName, nodeData); + } + } + + private void handleInputConfigChange(Type eventType, String nodeName, String nodeData) { + switch (eventType) { case NODE_ADDED: LOG.info("Node added under input ZK node: " + nodeName); addInputs(nodeName, nodeData); @@ -177,16 +166,33 @@ public class LogSearchConfigZK implements LogSearchConfig { } private void removeInputs(String serviceName) { - configMonitor.removeInputs(serviceName); + inputConfigMonitor.removeInputs(serviceName); } private void addInputs(String serviceName, String inputConfig) { try { - configMonitor.loadInputConfigs(serviceName, inputConfig); + inputConfigMonitor.loadInputConfigs(serviceName, inputConfig); } catch (Exception e) { LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e); } } + + private void handleLogLevelFilterChange(Type eventType, String nodeName, String nodeData) { + switch (eventType) { + case NODE_ADDED: + case NODE_UPDATED: + LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName); + LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class); + logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter); + break; + case NODE_REMOVED: + LOG.info("Node removed loglevelfilter input ZK node: " + nodeName); + logLevelFilterMonitor.removeLogLevelFilter(nodeName); + break; + default: + break; + } + } }; cache.getListenable().addListener(listener); cache.start(); @@ -206,6 +212,89 @@ public class LogSearchConfigZK implements LogSearchConfig { } @Override + public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception { + String nodePath = String.format("%s/%s/loglevelfilter/%s", root, clusterName, logId); + String logLevelFilterJson = gson.toJson(filter); + try { + client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, logLevelFilterJson.getBytes()); + LOG.info("Uploaded log level filter for the log " + logId + " for cluster " + clusterName); + } catch (NodeExistsException e) { + LOG.debug("Did not upload log level filters for log " + logId + " as it was already uploaded by another Log Feeder"); + } + } + + @Override + public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception { + for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) { + String nodePath = String.format("%s/%s/loglevelfilter/%s", root, clusterName, e.getKey()); + String logLevelFilterJson = gson.toJson(e.getValue()); + String currentLogLevelFilterJson = new String(cache.getCurrentData(nodePath).getData()); + if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) { + client.setData().forPath(nodePath, logLevelFilterJson.getBytes()); + LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName); + } + } + } + + @Override + public LogLevelFilterMap getLogLevelFilters(String clusterName) { + String parentPath = String.format("%s/%s/loglevelfilter", root, clusterName); + Map<String, ChildData> logLevelFilterNodes = cache.getCurrentChildren(parentPath); + TreeMap<String, LogLevelFilter> filters = new TreeMap<>(); + for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) { + LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class); + filters.put(e.getKey(), logLevelFilter); + } + + LogLevelFilterMap logLevelFilters = new LogLevelFilterMap(); + logLevelFilters.setFilter(filters); + return logLevelFilters; + } + + private List<ACL> getAcls() { + String aclStr = properties.get(ZK_ACLS_PROPERTY); + if (StringUtils.isBlank(aclStr)) { + return ZooDefs.Ids.OPEN_ACL_UNSAFE; + } + + List<ACL> acls = new ArrayList<>(); + List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr); + for (String unparcedAcl : aclStrList) { + String[] parts = unparcedAcl.split(":"); + if (parts.length == 3) { + acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1]))); + } + } + return acls; + } + + private Integer parsePermission(String permission) { + int permissionCode = 0; + for (char each : permission.toLowerCase().toCharArray()) { + switch (each) { + case 'r': + permissionCode |= ZooDefs.Perms.READ; + break; + case 'w': + permissionCode |= ZooDefs.Perms.WRITE; + break; + case 'c': + permissionCode |= ZooDefs.Perms.CREATE; + break; + case 'd': + permissionCode |= ZooDefs.Perms.DELETE; + break; + case 'a': + permissionCode |= ZooDefs.Perms.ADMIN; + break; + default: + throw new IllegalArgumentException("Unsupported permission: " + permission); + } + } + return permissionCode; + } + + @Override public void close() { LOG.info("Closing ZooKeeper Connection"); client.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 074fedb..c853f42 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -29,7 +29,7 @@ import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component; import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK; import org.apache.ambari.logfeeder.input.InputConfigUploader; import org.apache.ambari.logfeeder.input.InputManager; -import org.apache.ambari.logfeeder.logconfig.LogConfigHandler; +import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.metrics.MetricsManager; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -71,13 +71,13 @@ public class LogFeeder { long startTime = System.currentTimeMillis(); configHandler.init(); - LogConfigHandler.handleConfig(); SSLUtil.ensureStorePasswords(); config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()), LogSearchConfigZK.class); + LogLevelFilterHandler.init(config); InputConfigUploader.load(config); - config.monitorInputConfigChanges(configHandler); + config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler()); metricsManager.init(); http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index b70fbd1..8aec690 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -75,7 +75,7 @@ public class InputConfigUploader extends Thread { String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); if (!config.inputConfigExists(clusterName, serviceName)) { - config.setInputConfig(clusterName, serviceName, inputConfig); + config.createInputConfig(clusterName, serviceName, inputConfig); } filesHandled.add(inputConfigFile.getAbsolutePath()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java deleted file mode 100644 index a05a916..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.logconfig; - -import java.util.List; -import java.util.Map; - -import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; - -/** - * Read configuration from solr and filter the log - */ -public enum FilterLogData { - INSTANCE; - - private static final Logger LOG = Logger.getLogger(FilterLogData.class); - - private static final boolean DEFAULT_VALUE = true; - - public boolean isAllowed(String jsonBlock, InputMarker inputMarker) { - if (StringUtils.isEmpty(jsonBlock)) { - return DEFAULT_VALUE; - } - Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock); - return isAllowed(jsonObj, inputMarker); - } - - public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) { - if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE))) - return true; - - boolean isAllowed = applyFilter(jsonObj); - if (!isAllowed) { - LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); - } - return isAllowed; - } - - - private boolean applyFilter(Map<String, Object> jsonObj) { - if (MapUtils.isEmpty(jsonObj)) { - LOG.warn("Output jsonobj is empty"); - return DEFAULT_VALUE; - } - - String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); - String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); - String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); - if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(componentName) && StringUtils.isNotBlank(level)) { - LogFeederFilter componentFilter = LogConfigHandler.findComponentFilter(componentName); - if (componentFilter == null) { - return DEFAULT_VALUE; - } - List<String> allowedLevels = LogConfigHandler.getAllowedLevels(hostName, componentFilter); - if (CollectionUtils.isEmpty(allowedLevels)) { - allowedLevels.add(LogFeederConstants.ALL); - } - return LogFeederUtil.isListContains(allowedLevels, level, false); - } - else { - return DEFAULT_VALUE; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java deleted file mode 100644 index 12c744c..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.logconfig; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.SolrRequest.METHOD; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrDocumentList; -import org.apache.solr.common.SolrException; - -public class LogConfigFetcher { - private static final Logger LOG = Logger.getLogger(LogConfigFetcher.class); - - private static LogConfigFetcher instance; - public synchronized static LogConfigFetcher getInstance() { - if (instance == null) { - try { - instance = new LogConfigFetcher(); - } catch (Exception e) { - String logMessageKey = LogConfigFetcher.class.getSimpleName() + "_SOLR_UTIL"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error constructing solrUtil", e, LOG, Level.WARN); - } - } - return instance; - } - - private SolrClient solrClient; - - private String solrDetail = ""; - - public LogConfigFetcher() throws Exception { - String url = LogFeederUtil.getStringProperty("logfeeder.solr.url"); - String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string"); - String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history"); - connectToSolr(url, zkConnectString, collection); - } - - private SolrClient connectToSolr(String url, String zkConnectString, String collection) throws Exception { - solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection + ", url=" + url; - - LOG.info("connectToSolr() " + solrDetail); - if (StringUtils.isEmpty(collection)) { - throw new Exception("For solr, collection name is mandatory. " + solrDetail); - } - - if (StringUtils.isEmpty(zkConnectString) && StringUtils.isBlank(url)) - throw new Exception("Both zkConnectString and URL are empty. zkConnectString=" + zkConnectString + ", collection=" + - collection + ", url=" + url); - - if (StringUtils.isNotEmpty(zkConnectString)) { - solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection; - LOG.info("Using zookeepr. " + solrDetail); - CloudSolrClient solrClouldClient = new CloudSolrClient(zkConnectString); - solrClouldClient.setDefaultCollection(collection); - solrClient = solrClouldClient; - checkSolrStatus(3 * 60 * 1000); - } else { - solrDetail = "collection=" + collection + ", url=" + url; - String collectionURL = url + "/" + collection; - LOG.info("Connecting to solr : " + collectionURL); - solrClient = new HttpSolrClient(collectionURL); - } - return solrClient; - } - - private boolean checkSolrStatus(int waitDurationMS) { - boolean status = false; - try { - long beginTimeMS = System.currentTimeMillis(); - long waitIntervalMS = 2000; - int pingCount = 0; - while (true) { - pingCount++; - CollectionAdminResponse response = null; - try { - CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List(); - response = colListReq.process(solrClient); - } catch (Exception ex) { - LOG.error("Con't connect to Solr. solrDetail=" + solrDetail, ex); - } - if (response != null && response.getStatus() == 0) { - LOG.info("Solr getCollections() is success. solr=" + solrDetail); - status = true; - break; - } - if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) { - LOG.error("Solr is not reachable even after " + (System.currentTimeMillis() - beginTimeMS) - + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr=" - + solrDetail + ", response=" + response); - break; - } else { - LOG.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount + ". Will sleep for " + - waitIntervalMS + " ms and try again." + " solr=" + solrDetail + ", response=" + response); - } - Thread.sleep(waitIntervalMS); - } - } catch (Throwable t) { - LOG.error("Seems Solr is not up. solrDetail=" + solrDetail, t); - } - return status; - } - - public Map<String, Object> getConfigDoc() { - HashMap<String, Object> configMap = new HashMap<String, Object>(); - SolrQuery solrQuery = new SolrQuery(); - solrQuery.setQuery("*:*"); - String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME; - solrQuery.setFilterQueries(fq); - try { - QueryResponse response = process(solrQuery); - if (response != null) { - SolrDocumentList documentList = response.getResults(); - if (CollectionUtils.isNotEmpty(documentList)) { - SolrDocument configDoc = documentList.get(0); - String configJson = LogFeederUtil.getGson().toJson(configDoc); - configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson); - } - } - } catch (Exception e) { - String logMessageKey = this.getClass().getSimpleName() + "_FETCH_FILTER_CONFIG_ERROR"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error getting filter config from solr", e, LOG, Level.ERROR); - } - return configMap; - } - - private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException { - if (solrClient != null) { - QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST); - return queryResponse; - } else { - LOG.error("solrClient can't be null"); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java deleted file mode 100644 index 0ece637..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.logconfig; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; - -import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; - -public class LogConfigHandler extends Thread { - private static final Logger LOG = Logger.getLogger(LogConfigHandler.class); - - private static final int DEFAULT_SOLR_CONFIG_INTERVAL = 5; - private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; - private static final String TIMEZONE = "GMT"; - private static final int RETRY_INTERVAL = 30; - - static { - TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE)); - } - - private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() { - protected DateFormat initialValue() { - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE)); - return dateFormat; - } - }; - - private static boolean filterEnabled; - private static LogFeederFilterWrapper logFeederFilterWrapper; - - private static boolean running = false; - - public static void handleConfig() { - filterEnabled = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false); - if (!filterEnabled) { - LOG.info("Logfeeder filter Scheduler is disabled."); - return; - } - if (!running) { - new LogConfigHandler().start(); - running = true; - LOG.info("Logfeeder Filter Thread started!"); - } else { - LOG.warn("Logfeeder Filter Thread is already running."); - } - } - - private LogConfigHandler() { - setName(getClass().getSimpleName()); - setDaemon(true); - } - - @Override - public void run() { - String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string"); - String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url"); - if (StringUtils.isBlank(zkConnectString) && StringUtils.isBlank(solrUrl)) { - LOG.warn("Neither Solr ZK Connect String nor solr Url for UserConfig/History is set." + - "Won't look for level configuration from Solr."); - return; - } - - int solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", DEFAULT_SOLR_CONFIG_INTERVAL); - do { - LOG.debug("Updating config from solr after every " + solrConfigInterval + " sec."); - fetchConfig(); - try { - Thread.sleep(1000 * solrConfigInterval); - } catch (InterruptedException e) { - LOG.error(e.getLocalizedMessage(), e.getCause()); - } - } while (true); - } - - private synchronized void fetchConfig() { - LogConfigFetcher fetcher = LogConfigFetcher.getInstance(); - if (fetcher != null) { - Map<String, Object> configDocMap = fetcher.getConfigDoc(); - String configJson = (String) configDocMap.get(LogFeederConstants.VALUES); - if (configJson != null) { - logFeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, LogFeederFilterWrapper.class); - } - } - } - - public static boolean isFilterAvailable() { - return logFeederFilterWrapper != null; - } - - public static List<String> getAllowedLevels(String hostName, LogFeederFilter componentFilter) { - String componentName = componentFilter.getLabel(); - List<String> hosts = componentFilter.getHosts(); - List<String> defaultLevels = componentFilter.getDefaultLevels(); - List<String> overrideLevels = componentFilter.getOverrideLevels(); - String expiryTime = componentFilter.getExpiryTime(); - - // check is user override or not - if (StringUtils.isNotEmpty(expiryTime) || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) { - if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts - hosts.add(LogFeederConstants.ALL); - } - - if (LogFeederUtil.isListContains(hosts, hostName, false)) { - if (isFilterExpired(componentFilter)) { - LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " + - componentFilter.getExpiryTime()); - return defaultLevels; - } else { - return overrideLevels; - } - } - } - return defaultLevels; - } - - private static boolean isFilterExpired(LogFeederFilter logfeederFilter) { - if (logfeederFilter == null) - return false; - - Date filterEndDate = parseFilterExpireDate(logfeederFilter); - if (filterEndDate == null) { - return false; - } - - Date currentDate = new Date(); - if (!currentDate.before(filterEndDate)) { - LOG.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts : [" + - StringUtils.join(logfeederFilter.getHosts(), ',') + "] is expired because of filter endTime : " + - formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate)); - return true; - } else { - return false; - } - } - - private static Date parseFilterExpireDate(LogFeederFilter vLogfeederFilter) { - String expiryTime = vLogfeederFilter.getExpiryTime(); - if (StringUtils.isNotEmpty(expiryTime)) { - try { - return formatter.get().parse(expiryTime); - } catch (ParseException e) { - LOG.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel() - + " and hosts : [" + StringUtils.join(vLogfeederFilter.getHosts(), ',') + "]"); - } - } - return null; - } - - public static LogFeederFilter findComponentFilter(String componentName) { - waitForFilter(); - - if (logFeederFilterWrapper != null) { - HashMap<String, LogFeederFilter> filter = logFeederFilterWrapper.getFilter(); - if (filter != null) { - LogFeederFilter componentFilter = filter.get(componentName); - if (componentFilter != null) { - return componentFilter; - } - } - } - LOG.trace("Filter is not there for component :" + componentName); - return null; - } - - private static void waitForFilter() { - if (!filterEnabled || logFeederFilterWrapper != null) { - return; - } - - while (true) { - try { - Thread.sleep(RETRY_INTERVAL * 1000); - } catch (InterruptedException e) { - LOG.error(e); - } - - LOG.info("Checking if config is available"); - if (logFeederFilterWrapper != null) { - LOG.info("Config is available"); - return; - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java deleted file mode 100644 index 60c8ae8..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.logconfig; - -import java.util.ArrayList; -import java.util.List; - -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlRootElement; - -import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@XmlRootElement -@XmlAccessorType(XmlAccessType.FIELD) -public class LogFeederFilter { - - private String label; - private List<String> hosts; - private List<String> defaultLevels; - private List<String> overrideLevels; - private String expiryTime; - - public LogFeederFilter() { - hosts = new ArrayList<String>(); - defaultLevels = new ArrayList<String>(); - overrideLevels = new ArrayList<String>(); - } - - public String getLabel() { - return label; - } - - public void setLabel(String label) { - this.label = label; - } - - public List<String> getHosts() { - return hosts; - } - - public void setHosts(List<String> hosts) { - this.hosts = hosts; - } - - public List<String> getDefaultLevels() { - return defaultLevels; - } - - public void setDefaultLevels(List<String> defaultLevels) { - this.defaultLevels = defaultLevels; - } - - public List<String> getOverrideLevels() { - return overrideLevels; - } - - public void setOverrideLevels(List<String> overrideLevels) { - this.overrideLevels = overrideLevels; - } - - public String getExpiryTime() { - return expiryTime; - } - - public void setExpiryTime(String expiryTime) { - this.expiryTime = expiryTime; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java deleted file mode 100644 index 9199cd3..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.logconfig; - -import java.util.HashMap; - -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlRootElement; - -import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@XmlRootElement -@XmlAccessorType(XmlAccessType.FIELD) -public class LogFeederFilterWrapper { - - private HashMap<String, LogFeederFilter> filter; - private String id; - - public HashMap<String, LogFeederFilter> getFilter() { - return filter; - } - - public void setFilter(HashMap<String, LogFeederFilter> filter) { - this.filter = filter; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java new file mode 100644 index 0000000..1f635af --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.loglevelfilter; + +import java.util.Map; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; + +public enum FilterLogData { + INSTANCE; + + private static final Logger LOG = Logger.getLogger(FilterLogData.class); + + private static final boolean DEFAULT_VALUE = true; + + public boolean isAllowed(String jsonBlock, InputMarker inputMarker) { + if (StringUtils.isEmpty(jsonBlock)) { + return DEFAULT_VALUE; + } + Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock); + return isAllowed(jsonObj, inputMarker); + } + + public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) { + if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE))) + return true; + + boolean isAllowed = applyFilter(jsonObj); + if (!isAllowed) { + LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); + } + return isAllowed; + } + + + private boolean applyFilter(Map<String, Object> jsonObj) { + if (MapUtils.isEmpty(jsonObj)) { + LOG.warn("Output jsonobj is empty"); + return DEFAULT_VALUE; + } + + String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); + String logId = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); + String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); + if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(logId) && StringUtils.isNotBlank(level)) { + return LogLevelFilterHandler.isAllowed(hostName, logId, level); + } else { + return DEFAULT_VALUE; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java new file mode 100644 index 0000000..8a4d953 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.loglevelfilter; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; +import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +public class LogLevelFilterHandler implements LogLevelFilterMonitor { + private static final Logger LOG = Logger.getLogger(LogLevelFilterHandler.class); + + private static final String TIMEZONE = "GMT"; + private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; + + private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() { + protected DateFormat initialValue() { + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE)); + return dateFormat; + } + }; + + private static LogSearchConfig config; + private static String clusterName = LogFeederUtil.getStringProperty("cluster.name"); + private static boolean filterEnabled; + private static List<String> defaultLogLevels; + private static Map<String, LogLevelFilter> filters = new HashMap<>(); + + public static void init(LogSearchConfig config_) { + config = config_; + filterEnabled = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false); + defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty("logfeeder.include.default.level").split(",")); + TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE)); + } + + @Override + public void setLogLevelFilter(String logId, LogLevelFilter logLevelFilter) { + synchronized (LogLevelFilterHandler.class) { + filters.put(logId, logLevelFilter); + } + } + + @Override + public void removeLogLevelFilter(String logId) { + synchronized (LogLevelFilterHandler.class) { + filters.remove(logId); + } + } + + public static boolean isAllowed(String hostName, String logId, String level) { + if (!filterEnabled) { + return true; + } + + LogLevelFilter logFilter = findLogFilter(logId); + List<String> allowedLevels = getAllowedLevels(hostName, logFilter); + return allowedLevels.isEmpty() || allowedLevels.contains(level); + } + + private static synchronized LogLevelFilter findLogFilter(String logId) { + LogLevelFilter logFilter = filters.get(logId); + if (logFilter != null) { + return logFilter; + } + + LOG.info("Filter is not present for log " + logId + ", creating default filter"); + LogLevelFilter defaultFilter = new LogLevelFilter(); + defaultFilter.setLabel(logId); + defaultFilter.setDefaultLevels(defaultLogLevels); + + try { + config.createLogLevelFilter(clusterName, logId, defaultFilter); + filters.put(logId, defaultFilter); + } catch (Exception e) { + LOG.warn("Could not persist the default filter for log " + logId, e); + } + + return defaultFilter; + } + + private static List<String> getAllowedLevels(String hostName, LogLevelFilter componentFilter) { + String componentName = componentFilter.getLabel(); + List<String> hosts = componentFilter.getHosts(); + List<String> defaultLevels = componentFilter.getDefaultLevels(); + List<String> overrideLevels = componentFilter.getOverrideLevels(); + Date expiryTime = componentFilter.getExpiryTime(); + + // check is user override or not + if (expiryTime != null || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) { + if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts + hosts.add(LogFeederConstants.ALL); + } + + if (hosts.isEmpty() || hosts.contains(hostName)) { + if (isFilterExpired(componentFilter)) { + LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " + + componentFilter.getExpiryTime()); + return defaultLevels; + } else { + return overrideLevels; + } + } + } + return defaultLevels; + } + + private static boolean isFilterExpired(LogLevelFilter logLevelFilter) { + if (logLevelFilter == null) + return false; + + Date filterEndDate = logLevelFilter.getExpiryTime(); + if (filterEndDate == null) { + return false; + } + + Date currentDate = new Date(); + if (!currentDate.before(filterEndDate)) { + LOG.debug("Filter for Component :" + logLevelFilter.getLabel() + " and Hosts : [" + + StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired because of filter endTime : " + + formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate)); + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java index 135fb32..ba872f8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java @@ -29,7 +29,7 @@ import java.util.UUID; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.logconfig.FilterLogData; +import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.MurmurHash; http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java index bb2f0a9..1929178 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Properties; import org.apache.ambari.logfeeder.LogFeeder; -import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -311,24 +310,6 @@ public class LogFeederUtil { return false; } } - - public static boolean isListContains(List<String> list, String str, boolean caseSensitive) { - if (list == null) { - return false; - } - - for (String value : list) { - if (value == null) { - continue; - } - - if (caseSensitive ? value.equals(str) : value.equalsIgnoreCase(str) || - value.equalsIgnoreCase(LogFeederConstants.ALL)) { - return true; - } - } - return false; - } private static String logfeederTempDir = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java index 266108f..44314c6 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java @@ -18,8 +18,9 @@ package org.apache.ambari.logfeeder.logconfig; -import java.lang.reflect.Field; +import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -29,15 +30,17 @@ import static org.junit.Assert.*; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData; +import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.junit.AfterClass; +import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; +import org.apache.commons.lang.time.DateUtils; import org.junit.BeforeClass; import org.junit.Test; public class LogConfigHandlerTest { - private static LogConfigFetcher mockFetcher; - private static InputMarker inputMarkerAudit; private static InputMarker inputMarkerService; static { @@ -56,47 +59,41 @@ public class LogConfigHandlerTest { replay(auditInput, serviceInput); } - private static final Map<String, Object> CONFIG_MAP = new HashMap<>(); - static { - CONFIG_MAP.put("jsons", - "{'filter':{" + - "'configured_log_file':{" + - "'label':'configured_log_file'," + - "'hosts':[]," + - "'defaultLevels':['FATAL','ERROR','WARN','INFO']," + - "'overrideLevels':[]}," + - "'configured_log_file2':{" + - "'label':'configured_log_file2'," + - "'hosts':['host1']," + - "'defaultLevels':['FATAL','ERROR','WARN','INFO']," + - "'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," + - "'expiryTime':'3000-01-01T00:00:00.000Z'}," + - "'configured_log_file3':{" + - "'label':'configured_log_file3'," + - "'hosts':['host1']," + - "'defaultLevels':['FATAL','ERROR','WARN','INFO']," + - "'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," + - "'expiryTime':'1000-01-01T00:00:00.000Z'}" + - "}}"); - } - @BeforeClass public static void init() throws Exception { - mockFetcher = strictMock(LogConfigFetcher.class); - Field f = LogConfigFetcher.class.getDeclaredField("instance"); - f.setAccessible(true); - f.set(null, mockFetcher); - expect(mockFetcher.getConfigDoc()).andReturn(CONFIG_MAP).anyTimes(); - replay(mockFetcher); - LogFeederUtil.loadProperties("logfeeder.properties", null); - LogConfigHandler.handleConfig(); - Thread.sleep(1000); + + LogSearchConfig config = strictMock(LogSearchConfig.class); + config.createLogLevelFilter(anyString(), anyString(), anyObject(LogLevelFilter.class)); + expectLastCall().anyTimes(); + LogLevelFilterHandler.init(config); + + LogLevelFilter logLevelFilter1 = new LogLevelFilter(); + logLevelFilter1.setHosts(Collections.<String> emptyList()); + logLevelFilter1.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO")); + logLevelFilter1.setOverrideLevels(Collections.<String> emptyList()); + + LogLevelFilter logLevelFilter2 = new LogLevelFilter(); + logLevelFilter2.setHosts(Arrays.asList("host1")); + logLevelFilter2.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO")); + logLevelFilter2.setOverrideLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE")); + logLevelFilter2.setExpiryTime(DateUtils.addDays(new Date(), 1)); + + LogLevelFilter logLevelFilter3 = new LogLevelFilter(); + logLevelFilter3.setHosts(Arrays.asList("host1")); + logLevelFilter3.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO")); + logLevelFilter3.setOverrideLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE")); + logLevelFilter3.setExpiryTime(DateUtils.addDays(new Date(), -1)); + + LogLevelFilterHandler h = new LogLevelFilterHandler(); + h.setLogLevelFilter("configured_log_file1", logLevelFilter1); + h.setLogLevelFilter("configured_log_file2", logLevelFilter2); + h.setLogLevelFilter("configured_log_file3", logLevelFilter3); } @Test public void testLogConfigHandler_auditAllowed() throws Exception { - assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'DEBUG'}", + assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file1', 'level':'DEBUG'}", inputMarkerAudit)); } @@ -109,19 +106,25 @@ public class LogConfigHandlerTest { @Test public void testLogConfigHandler_notConfiguredLogAllowed() throws Exception { - assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file', 'level':'INFO'}", + assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file1', 'level':'WARN'}", + inputMarkerService)); + } + + @Test + public void testLogConfigHandler_notConfiguredLogNotAllowed() throws Exception { + assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file1', 'level':'TRACE'}", inputMarkerService)); } @Test public void testLogConfigHandler_configuredDataAllow() throws Exception { - assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'INFO'}", + assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file1', 'level':'INFO'}", inputMarkerService)); } @Test public void testLogConfigHandler_configuredDataDontAllow() throws Exception { - assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'DEBUG'}", + assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file1', 'level':'DEBUG'}", inputMarkerService)); } @@ -142,9 +145,4 @@ public class LogConfigHandlerTest { assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file3', 'level':'DEBUG'}", inputMarkerService)); } - - @AfterClass - public static void finish() { - verify(mockFetcher); - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties index 59020cc..19027d1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties @@ -17,4 +17,5 @@ logfeeder.log.filter.enable=true logfeeder.solr.config.interval=5 logfeeder.solr.zk_connect_string=some_connect_string logfeeder.metrics.collector.hosts=some_collector_host -node.hostname=test_host_name \ No newline at end of file +node.hostname=test_host_name +logfeeder.include.default.level=FATAL,ERROR,WARN \ No newline at end of file