AMBARI-20881 Add Log Level Filter to the Log Search config API (mgergely)

Change-Id: I8e3d5a628d02407ad2af4ecb77fff3ada10f7707


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b94d3cf
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b94d3cf
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b94d3cf

Branch: refs/heads/trunk
Commit: 3b94d3cff380bf3557af57c6ecd9005ab46b8916
Parents: f18a822
Author: Miklos Gergely <mgerg...@hortonworks.com>
Authored: Wed May 17 14:47:06 2017 +0200
Committer: Miklos Gergely <mgerg...@hortonworks.com>
Committed: Wed May 17 14:47:40 2017 +0200

----------------------------------------------------------------------
 .../ambari-logsearch-config-api/pom.xml         |   2 +-
 .../config/api/InputConfigMonitor.java          |   4 +-
 .../config/api/LogLevelFilterMonitor.java       |  44 ++++
 .../logsearch/config/api/LogSearchConfig.java   |  57 ++++-
 .../model/loglevelfilter/LogLevelFilter.java    |  79 +++++++
 .../model/loglevelfilter/LogLevelFilterMap.java |  33 +++
 .../config/api/LogSearchConfigClass1.java       |  21 +-
 .../config/api/LogSearchConfigClass2.java       |  21 +-
 .../ambari-logsearch-config-zookeeper/pom.xml   |   4 +
 .../config/zookeeper/LogSearchConfigZK.java     | 191 ++++++++++++-----
 .../org/apache/ambari/logfeeder/LogFeeder.java  |   6 +-
 .../logfeeder/input/InputConfigUploader.java    |   2 +-
 .../logfeeder/logconfig/FilterLogData.java      |  87 --------
 .../logfeeder/logconfig/LogConfigFetcher.java   | 168 ---------------
 .../logfeeder/logconfig/LogConfigHandler.java   | 213 -------------------
 .../logfeeder/logconfig/LogFeederFilter.java    |  90 --------
 .../logconfig/LogFeederFilterWrapper.java       |  55 -----
 .../logfeeder/loglevelfilter/FilterLogData.java |  73 +++++++
 .../loglevelfilter/LogLevelFilterHandler.java   | 157 ++++++++++++++
 .../ambari/logfeeder/output/OutputManager.java  |   2 +-
 .../ambari/logfeeder/util/LogFeederUtil.java    |  19 --
 .../logconfig/LogConfigHandlerTest.java         |  90 ++++----
 .../src/test/resources/logfeeder.properties     |   3 +-
 .../configurer/LogfeederFilterConfigurer.java   |  66 ------
 .../ambari/logsearch/dao/UserConfigSolrDao.java |  79 -------
 .../ambari/logsearch/doc/DocConstants.java      |  10 +-
 .../logsearch/manager/ShipperConfigManager.java |  45 +++-
 .../logsearch/manager/UserConfigManager.java    |  24 ---
 .../model/common/LSServerLogLevelFilter.java    | 100 +++++++++
 .../model/common/LSServerLogLevelFilterMap.java |  65 ++++++
 .../model/common/LogFeederDataMap.java          |  50 -----
 .../model/common/LogfeederFilterData.java       |  87 --------
 .../logsearch/rest/ShipperConfigResource.java   |  43 +++-
 .../logsearch/rest/UserConfigResource.java      |  18 --
 .../webapp/templates/common/Header_tmpl.html    |   5 +-
 .../server/upgrade/UpgradeCatalog300.java       |  15 ++
 .../configuration/logfeeder-properties.xml      |  10 +
 .../configuration/logsearch-properties.xml      |  10 -
 .../LOGSEARCH/0.5.0/themes/theme.json           |   4 +-
 .../server/upgrade/UpgradeCatalog300Test.java   |  29 +++
 40 files changed, 982 insertions(+), 1099 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/pom.xml 
b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
index e9abed0..72fcc80 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
@@ -33,7 +33,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   </properties>
-  
+
   <dependencies>
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
index df26920..29a82a6 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
@@ -20,7 +20,7 @@
 package org.apache.ambari.logsearch.config.api;
 
 /**
- * Monitors input configuration changes. 
+ * Monitors input configuration changes.
  */
 public interface InputConfigMonitor {
   /**
@@ -31,7 +31,7 @@ public interface InputConfigMonitor {
    * @throws Exception
    */
   void loadInputConfigs(String serviceName, String inputConfig) throws 
Exception;
-  
+
   /**
    * Notification of the removal of an input configuration.
    * 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
new file mode 100644
index 0000000..766f751
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Monitors log level filter changes.
+ */
+package org.apache.ambari.logsearch.config.api;
+
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+
+public interface LogLevelFilterMonitor {
+
+  /**
+   * Notification of a new or updated log level filter.
+   * 
+   * @param logId The log for which the log level filter was created/updated.
+   * @param logLevelFilter The log level filter to apply from now on to the 
log.
+   */
+  void setLogLevelFilter(String logId, LogLevelFilter logLevelFilter);
+
+  /**
+   * Notification of the removal of a log level filter.
+   * 
+   * @param logId The log of which's log level filter was removed.
+   */
+  void removeLogLevelFilter(String logId);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index 0bb0b78..07921d0 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -23,6 +23,9 @@ import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+
 /**
  * Log Search Configuration, which uploads, retrieves configurations, and 
monitors it's changes.
  */
@@ -33,7 +36,7 @@ public interface LogSearchConfig extends Closeable {
   public enum Component {
     SERVER, LOGFEEDER;
   }
-  
+
   /**
    * Initialization of the configuration.
    * 
@@ -42,7 +45,7 @@ public interface LogSearchConfig extends Closeable {
    * @throws Exception
    */
   void init(Component component, Map<String, String> properties) throws 
Exception;
-  
+
   /**
    * Returns all the service names with input configurations of a cluster. 
Will be used only in SERVER mode.
    * 
@@ -50,7 +53,7 @@ public interface LogSearchConfig extends Closeable {
    * @return List of the service names.
    */
   List<String> getServices(String clusterName);
-  
+
   /**
    * Checks if input configuration exists.
    * 
@@ -60,7 +63,7 @@ public interface LogSearchConfig extends Closeable {
    * @throws Exception
    */
   boolean inputConfigExists(String clusterName, String serviceName) throws 
Exception;
-  
+
   /**
    * Returns the input configuration of a service in a cluster. Will be used 
only in SERVER mode.
    * 
@@ -69,7 +72,7 @@ public interface LogSearchConfig extends Closeable {
    * @return The input configuration for the service if it exists, null 
otherwise.
    */
   String getInputConfig(String clusterName, String serviceName);
-  
+
   /**
    * Uploads the input configuration for a service in a cluster.
    * 
@@ -78,13 +81,51 @@ public interface LogSearchConfig extends Closeable {
    * @param inputConfig The input configuration of the service.
    * @throws Exception
    */
+  void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception;
+
+  /**
+   * Modifies the input configuration for a service in a cluster.
+   * 
+   * @param clusterName The name of the cluster where the service is.
+   * @param serviceName The name of the service of which's input configuration 
is uploaded.
+   * @param inputConfig The input configuration of the service.
+   * @throws Exception
+   */
   void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception;
-  
+
+  /**
+   * Uploads the log level filter of a log.
+   * 
+   * @param clusterName The name of the cluster where the log is.
+   * @param logId The id of the log.
+   * @param filter The log level filter for the log.
+   * @throws Exception 
+   */
+  void createLogLevelFilter(String clusterName, String logId, LogLevelFilter 
filter) throws Exception;
+
+  /**
+   * Modifies the log level filters for all the logs.
+   * 
+   * @param clusterName The name of the cluster where the logs are.
+   * @param filters The log level filters to set.
+   * @throws Exception
+   */
+  void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) 
throws Exception;
+
+  /**
+   * Returns the Log Level Filters of a cluster.
+   * 
+   * @param clusterName The name of the cluster which's log level filters are 
required.
+   * @return All the log level filters of the cluster.
+   */
+  LogLevelFilterMap getLogLevelFilters(String clusterName);
+
   /**
    * Starts the monitoring of the input configurations, asynchronously. Will 
be used only in LOGFEEDER mode.
    * 
-   * @param configMonitor The input config monitor to call in case of a config 
change.
+   * @param inputConfigMonitor The input config monitor to call in case of an 
input config change.
+   * @param logLevelFilterMonitor The log level filter monitor to call in case 
of a log level filter change.
    * @throws Exception
    */
-  void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws 
Exception;
+  void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
new file mode 100644
index 0000000..06cf589
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.api.model.loglevelfilter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class LogLevelFilter {
+
+  private String label;
+  private List<String> hosts;
+  private List<String> defaultLevels;
+  private List<String> overrideLevels;
+  private Date expiryTime;
+
+  public LogLevelFilter() {
+    hosts = new ArrayList<String>();
+    defaultLevels = new ArrayList<String>();
+    overrideLevels = new ArrayList<String>();
+  }
+
+  public String getLabel() {
+    return label;
+  }
+
+  public void setLabel(String label) {
+    this.label = label;
+  }
+
+  public List<String> getHosts() {
+    return hosts;
+  }
+
+  public void setHosts(List<String> hosts) {
+    this.hosts = hosts;
+  }
+
+  public List<String> getDefaultLevels() {
+    return defaultLevels;
+  }
+
+  public void setDefaultLevels(List<String> defaultLevels) {
+    this.defaultLevels = defaultLevels;
+  }
+
+  public List<String> getOverrideLevels() {
+    return overrideLevels;
+  }
+
+  public void setOverrideLevels(List<String> overrideLevels) {
+    this.overrideLevels = overrideLevels;
+  }
+
+  public Date getExpiryTime() {
+    return expiryTime;
+  }
+
+  public void setExpiryTime(Date expiryTime) {
+    this.expiryTime = expiryTime;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
new file mode 100644
index 0000000..37fdb9f
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.api.model.loglevelfilter;
+
+import java.util.TreeMap;
+
+public class LogLevelFilterMap {
+  private TreeMap<String, LogLevelFilter> filter;
+
+  public TreeMap<String, LogLevelFilter> getFilter() {
+    return filter;
+  }
+
+  public void setFilter(TreeMap<String, LogLevelFilter> filter) {
+    this.filter = filter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
index 969eb30..fc3fe5b 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
 
 public class LogSearchConfigClass1 implements LogSearchConfig {
   @Override
@@ -35,10 +37,14 @@ public class LogSearchConfigClass1 implements 
LogSearchConfig {
   }
 
   @Override
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
   public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
 
   @Override
-  public void monitorInputConfigChanges(InputConfigMonitor configMonitor) 
throws Exception {}
+  public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor)
+      throws Exception {}
 
   @Override
   public List<String> getServices(String clusterName) {
@@ -49,7 +55,18 @@ public class LogSearchConfigClass1 implements 
LogSearchConfig {
   public String getInputConfig(String clusterName, String serviceName) {
     return null;
   }
-  
+
+  @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
+
+  @Override
+  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {}
+
+  @Override
+  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+    return null;
+  }
+
   @Override
   public void close() {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
index 664ecc9..346edb3 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
 
 public class LogSearchConfigClass2 implements LogSearchConfig {
   @Override
@@ -35,10 +37,14 @@ public class LogSearchConfigClass2 implements 
LogSearchConfig {
   }
 
   @Override
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
   public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
 
   @Override
-  public void monitorInputConfigChanges(InputConfigMonitor configMonitor) 
throws Exception {}
+  public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor)
+      throws Exception {}
 
   @Override
   public List<String> getServices(String clusterName) {
@@ -49,7 +55,18 @@ public class LogSearchConfigClass2 implements 
LogSearchConfig {
   public String getInputConfig(String clusterName, String serviceName) {
     return null;
   }
-  
+
+  @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
+
+  @Override
+  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {}
+
+  @Override
+  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+    return null;
+  }
+
   @Override
   public void close() {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
index 4ed8eba..2c59a4a 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
@@ -70,5 +70,9 @@
       <artifactId>curator-recipes</artifactId>
       <version>2.12.0</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 738fde2..5e22374 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -22,9 +22,13 @@ package org.apache.ambari.logsearch.config.zookeeper;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -32,15 +36,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 
 import com.google.common.base.Splitter;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 
 public class LogSearchConfigZK implements LogSearchConfig {
   private static final Logger LOG = Logger.getLogger(LogSearchConfigZK.class);
@@ -49,6 +57,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
   private static final int CONNECTION_TIMEOUT = 30000;
   private static final String DEFAULT_ZK_ROOT = "/logsearch";
   private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
+  private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
 
   private static final String CLUSTER_NAME_PROPERTY = "cluster.name";
   private static final String ZK_CONNECT_STRING_PROPERTY = 
"logsearch.config.zk_connect_string";
@@ -59,6 +68,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
   private String root;
   private CuratorFramework client;
   private TreeCache cache;
+  private Gson gson;
 
   @Override
   public void init(Component component, Map<String, String> properties) throws 
Exception {
@@ -89,6 +99,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
 
       cache = new TreeCache(client, String.format("%s/%s", root, 
properties.get(CLUSTER_NAME_PROPERTY)));
     }
+    
+    gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
   }
 
   @Override
@@ -98,66 +110,43 @@ public class LogSearchConfigZK implements LogSearchConfig {
   }
 
   @Override
-  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {
     String nodePath = String.format("%s/%s/input/%s", root, clusterName, 
serviceName);
-    
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath,
 inputConfig.getBytes());
-    LOG.info("Set input config for the service " + serviceName + " for cluster 
" + clusterName);
-  }
-
-  private List<ACL> getAcls() {
-    String aclStr = properties.get(ZK_ACLS_PROPERTY);
-    if (StringUtils.isBlank(aclStr)) {
-      return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    try {
+      
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath,
 inputConfig.getBytes());
+      LOG.info("Uploaded input config for the service " + serviceName + " for 
cluster " + clusterName);
+    } catch (NodeExistsException e) {
+      LOG.debug("Did not upload input config for service " + serviceName + " 
as it was already uploaded by another Log Feeder");
     }
-
-    List<ACL> acls = new ArrayList<>();
-    List<String> aclStrList = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
-    for (String unparcedAcl : aclStrList) {
-      String[] parts = unparcedAcl.split(":");
-      if (parts.length == 3) {
-        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], 
parts[1])));
-      }
-    }
-    return acls;
   }
 
-  private Integer parsePermission(String permission) {
-    int permissionCode = 0;
-    for (char each : permission.toLowerCase().toCharArray()) {
-      switch (each) {
-        case 'r':
-          permissionCode |= ZooDefs.Perms.READ;
-          break;
-        case 'w':
-          permissionCode |= ZooDefs.Perms.WRITE;
-          break;
-        case 'c':
-          permissionCode |= ZooDefs.Perms.CREATE;
-          break;
-        case 'd':
-          permissionCode |= ZooDefs.Perms.DELETE;
-          break;
-        case 'a':
-          permissionCode |= ZooDefs.Perms.ADMIN;
-          break;
-        default:
-          throw new IllegalArgumentException("Unsupported permission: " + 
permission);
-      }
-    }
-    return permissionCode;
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {
+    String nodePath = String.format("%s/%s/input/%s", root, clusterName, 
serviceName);
+    client.setData().forPath(nodePath, inputConfig.getBytes());
+    LOG.info("Set input config for the service " + serviceName + " for cluster 
" + clusterName);
   }
 
   @Override
-  public void monitorInputConfigChanges(final InputConfigMonitor 
configMonitor) throws Exception {
+  public void monitorInputConfigChanges(final InputConfigMonitor 
inputConfigMonitor,
+      final LogLevelFilterMonitor logLevelFilterMonitor ) throws Exception {
     TreeCacheListener listener = new TreeCacheListener() {
       public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
-        if 
(!event.getData().getPath().startsWith(String.format("%s/%s/input/", root, 
properties.get(CLUSTER_NAME_PROPERTY)))) {
-          return;
-        }
-        
         String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
         String nodeData = new String(event.getData().getData());
-        switch (event.getType()) {
+        Type eventType = event.getType();
+        
+        String configPathStab = String.format("%s/%s/", root, 
properties.get(CLUSTER_NAME_PROPERTY));
+        
+        if (event.getData().getPath().startsWith(configPathStab + "input/")) {
+          handleInputConfigChange(eventType, nodeName, nodeData);
+        } else if (event.getData().getPath().startsWith(configPathStab + 
"loglevelfilter/")) {
+          handleLogLevelFilterChange(eventType, nodeName, nodeData);
+        }
+      }
+
+      private void handleInputConfigChange(Type eventType, String nodeName, 
String nodeData) {
+        switch (eventType) {
           case NODE_ADDED:
             LOG.info("Node added under input ZK node: " + nodeName);
             addInputs(nodeName, nodeData);
@@ -177,16 +166,33 @@ public class LogSearchConfigZK implements LogSearchConfig 
{
       }
 
       private void removeInputs(String serviceName) {
-        configMonitor.removeInputs(serviceName);
+        inputConfigMonitor.removeInputs(serviceName);
       }
 
       private void addInputs(String serviceName, String inputConfig) {
         try {
-          configMonitor.loadInputConfigs(serviceName, inputConfig);
+          inputConfigMonitor.loadInputConfigs(serviceName, inputConfig);
         } catch (Exception e) {
           LOG.error("Could not load input configuration for service " + 
serviceName + ":\n" + inputConfig, e);
         }
       }
+
+      private void handleLogLevelFilterChange(Type eventType, String nodeName, 
String nodeData) {
+        switch (eventType) {
+          case NODE_ADDED:
+          case NODE_UPDATED:
+            LOG.info("Node added/updated under loglevelfilter ZK node: " + 
nodeName);
+            LogLevelFilter logLevelFilter = gson.fromJson(nodeData, 
LogLevelFilter.class);
+            logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
+            break;
+          case NODE_REMOVED:
+            LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
+            logLevelFilterMonitor.removeLogLevelFilter(nodeName);
+            break;
+          default:
+            break;
+        }
+      }
     };
     cache.getListenable().addListener(listener);
     cache.start();
@@ -206,6 +212,89 @@ public class LogSearchConfigZK implements LogSearchConfig {
   }
 
   @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) throws Exception {
+    String nodePath = String.format("%s/%s/loglevelfilter/%s", root, 
clusterName, logId);
+    String logLevelFilterJson = gson.toJson(filter);
+    try {
+      
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath,
 logLevelFilterJson.getBytes());
+      LOG.info("Uploaded log level filter for the log " + logId + " for 
cluster " + clusterName);
+    } catch (NodeExistsException e) {
+      LOG.debug("Did not upload log level filters for log " + logId + " as it 
was already uploaded by another Log Feeder");
+    }
+  }
+
+  @Override
+  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {
+    for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) 
{
+      String nodePath = String.format("%s/%s/loglevelfilter/%s", root, 
clusterName, e.getKey());
+      String logLevelFilterJson = gson.toJson(e.getValue());
+      String currentLogLevelFilterJson = new 
String(cache.getCurrentData(nodePath).getData());
+      if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) {
+        client.setData().forPath(nodePath, logLevelFilterJson.getBytes());
+        LOG.info("Set log level filter for the log " + e.getKey() + " for 
cluster " + clusterName);
+      }
+    }
+  }
+
+  @Override
+  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+    String parentPath = String.format("%s/%s/loglevelfilter", root, 
clusterName);
+    Map<String, ChildData> logLevelFilterNodes = 
cache.getCurrentChildren(parentPath);
+    TreeMap<String, LogLevelFilter> filters = new TreeMap<>();
+    for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {
+      LogLevelFilter logLevelFilter = gson.fromJson(new 
String(e.getValue().getData()), LogLevelFilter.class);
+      filters.put(e.getKey(), logLevelFilter);
+    }
+    
+    LogLevelFilterMap logLevelFilters = new LogLevelFilterMap();
+    logLevelFilters.setFilter(filters);
+    return logLevelFilters;
+  }
+
+  private List<ACL> getAcls() {
+    String aclStr = properties.get(ZK_ACLS_PROPERTY);
+    if (StringUtils.isBlank(aclStr)) {
+      return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    }
+
+    List<ACL> acls = new ArrayList<>();
+    List<String> aclStrList = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
+    for (String unparcedAcl : aclStrList) {
+      String[] parts = unparcedAcl.split(":");
+      if (parts.length == 3) {
+        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], 
parts[1])));
+      }
+    }
+    return acls;
+  }
+
+  private Integer parsePermission(String permission) {
+    int permissionCode = 0;
+    for (char each : permission.toLowerCase().toCharArray()) {
+      switch (each) {
+        case 'r':
+          permissionCode |= ZooDefs.Perms.READ;
+          break;
+        case 'w':
+          permissionCode |= ZooDefs.Perms.WRITE;
+          break;
+        case 'c':
+          permissionCode |= ZooDefs.Perms.CREATE;
+          break;
+        case 'd':
+          permissionCode |= ZooDefs.Perms.DELETE;
+          break;
+        case 'a':
+          permissionCode |= ZooDefs.Perms.ADMIN;
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported permission: " + 
permission);
+      }
+    }
+    return permissionCode;
+  }
+
+  @Override
   public void close() {
     LOG.info("Closing ZooKeeper Connection");
     client.close();

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 074fedb..c853f42 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -29,7 +29,7 @@ import 
org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
 import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.input.InputManager;
-import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -71,13 +71,13 @@ public class LogFeeder {
     long startTime = System.currentTimeMillis();
 
     configHandler.init();
-    LogConfigHandler.handleConfig();
     SSLUtil.ensureStorePasswords();
     
     config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,
         Maps.fromProperties(LogFeederUtil.getProperties()), 
LogSearchConfigZK.class);
+    LogLevelFilterHandler.init(config);
     InputConfigUploader.load(config);
-    config.monitorInputConfigChanges(configHandler);
+    config.monitorInputConfigChanges(configHandler, new 
LogLevelFilterHandler());
     
     metricsManager.init();
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index b70fbd1..8aec690 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -75,7 +75,7 @@ public class InputConfigUploader extends Thread {
             String inputConfig = Files.toString(inputConfigFile, 
Charset.defaultCharset());
             
             if (!config.inputConfigExists(clusterName, serviceName)) {
-              config.setInputConfig(clusterName, serviceName, inputConfig);
+              config.createInputConfig(clusterName, serviceName, inputConfig);
             }
             filesHandled.add(inputConfigFile.getAbsolutePath());
           } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
deleted file mode 100644
index a05a916..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
-
-/**
- * Read configuration from solr and filter the log
- */
-public enum FilterLogData {
-  INSTANCE;
-  
-  private static final Logger LOG = Logger.getLogger(FilterLogData.class);
-  
-  private static final boolean DEFAULT_VALUE = true;
-
-  public boolean isAllowed(String jsonBlock, InputMarker inputMarker) {
-    if (StringUtils.isEmpty(jsonBlock)) {
-      return DEFAULT_VALUE;
-    }
-    Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
-    return isAllowed(jsonObj, inputMarker);
-  }
-
-  public boolean isAllowed(Map<String, Object> jsonObj, InputMarker 
inputMarker) {
-    if 
("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
-      return true;
-    
-    boolean isAllowed = applyFilter(jsonObj);
-    if (!isAllowed) {
-      LOG.trace("Filter block the content :" + 
LogFeederUtil.getGson().toJson(jsonObj));
-    }
-    return isAllowed;
-  }
-  
-
-  private boolean applyFilter(Map<String, Object> jsonObj) {
-    if (MapUtils.isEmpty(jsonObj)) {
-      LOG.warn("Output jsonobj is empty");
-      return DEFAULT_VALUE;
-    }
-    
-    String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
-    String componentName = (String) 
jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
-    String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
-    if (StringUtils.isNotBlank(hostName) && 
StringUtils.isNotBlank(componentName) && StringUtils.isNotBlank(level)) {
-      LogFeederFilter componentFilter = 
LogConfigHandler.findComponentFilter(componentName);
-      if (componentFilter == null) {
-        return DEFAULT_VALUE;
-      }
-      List<String> allowedLevels = LogConfigHandler.getAllowedLevels(hostName, 
componentFilter);
-      if (CollectionUtils.isEmpty(allowedLevels)) {
-        allowedLevels.add(LogFeederConstants.ALL);
-      }
-      return LogFeederUtil.isListContains(allowedLevels, level, false);
-    }
-    else {
-      return DEFAULT_VALUE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
deleted file mode 100644
index 12c744c..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.SolrRequest.METHOD;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-
-public class LogConfigFetcher {
-  private static final Logger LOG = Logger.getLogger(LogConfigFetcher.class);
-  
-  private static LogConfigFetcher instance;
-  public synchronized static LogConfigFetcher getInstance() {
-    if (instance == null) {
-      try {
-        instance = new LogConfigFetcher();
-      } catch (Exception e) {
-        String logMessageKey = LogConfigFetcher.class.getSimpleName() + 
"_SOLR_UTIL";
-              LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error 
constructing solrUtil", e, LOG, Level.WARN);
-      }
-    }
-    return instance;
-  }
-
-  private SolrClient solrClient;
-
-  private String solrDetail = "";
-
-  public LogConfigFetcher() throws Exception {
-    String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
-    String zkConnectString = 
LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
-    String collection = 
LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
-    connectToSolr(url, zkConnectString, collection);
-  }
-
-  private SolrClient connectToSolr(String url, String zkConnectString, String 
collection) throws Exception {
-    solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + 
collection + ", url=" + url;
-
-    LOG.info("connectToSolr() " + solrDetail);
-    if (StringUtils.isEmpty(collection)) {
-      throw new Exception("For solr, collection name is mandatory. " + 
solrDetail);
-    }
-    
-    if (StringUtils.isEmpty(zkConnectString) && StringUtils.isBlank(url))
-      throw new Exception("Both zkConnectString and URL are empty. 
zkConnectString=" + zkConnectString + ", collection=" +
-          collection + ", url=" + url);
-    
-    if (StringUtils.isNotEmpty(zkConnectString)) {
-      solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + 
collection;
-      LOG.info("Using zookeepr. " + solrDetail);
-      CloudSolrClient solrClouldClient = new CloudSolrClient(zkConnectString);
-      solrClouldClient.setDefaultCollection(collection);
-      solrClient = solrClouldClient;
-      checkSolrStatus(3 * 60 * 1000);
-    } else {
-      solrDetail = "collection=" + collection + ", url=" + url;
-      String collectionURL = url + "/" + collection;
-      LOG.info("Connecting to  solr : " + collectionURL);
-      solrClient = new HttpSolrClient(collectionURL);
-    }
-    return solrClient;
-  }
-
-  private boolean checkSolrStatus(int waitDurationMS) {
-    boolean status = false;
-    try {
-      long beginTimeMS = System.currentTimeMillis();
-      long waitIntervalMS = 2000;
-      int pingCount = 0;
-      while (true) {
-        pingCount++;
-        CollectionAdminResponse response = null;
-        try {
-          CollectionAdminRequest.List colListReq = new 
CollectionAdminRequest.List();
-          response = colListReq.process(solrClient);
-        } catch (Exception ex) {
-          LOG.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
-        }
-        if (response != null && response.getStatus() == 0) {
-          LOG.info("Solr getCollections() is success. solr=" + solrDetail);
-          status = true;
-          break;
-        }
-        if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
-          LOG.error("Solr is not reachable even after " + 
(System.currentTimeMillis() - beginTimeMS)
-            + " ms. If you are using alias, then you might have to restart 
LogSearch after Solr is up and running. solr="
-            + solrDetail + ", response=" + response);
-          break;
-        } else {
-          LOG.warn("Solr is not reachable yet. getCollections() attempt 
count=" + pingCount + ". Will sleep for " +
-              waitIntervalMS + " ms and try again." + " solr=" + solrDetail + 
", response=" + response);
-        }
-        Thread.sleep(waitIntervalMS);
-      }
-    } catch (Throwable t) {
-      LOG.error("Seems Solr is not up. solrDetail=" + solrDetail, t);
-    }
-    return status;
-  }
-
-  public Map<String, Object> getConfigDoc() {
-    HashMap<String, Object> configMap = new HashMap<String, Object>();
-    SolrQuery solrQuery = new SolrQuery();
-    solrQuery.setQuery("*:*");
-    String fq = LogFeederConstants.ROW_TYPE + ":" + 
LogFeederConstants.LOGFEEDER_FILTER_NAME;
-    solrQuery.setFilterQueries(fq);
-    try {
-      QueryResponse response = process(solrQuery);
-      if (response != null) {
-        SolrDocumentList documentList = response.getResults();
-        if (CollectionUtils.isNotEmpty(documentList)) {
-          SolrDocument configDoc = documentList.get(0);
-          String configJson = LogFeederUtil.getGson().toJson(configDoc);
-          configMap = (HashMap<String, Object>) 
LogFeederUtil.toJSONObject(configJson);
-        }
-      }
-    } catch (Exception e) {
-      String logMessageKey = this.getClass().getSimpleName() + 
"_FETCH_FILTER_CONFIG_ERROR";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error getting 
filter config from solr", e, LOG, Level.ERROR);
-    }
-    return configMap;
-  }
-
-  private QueryResponse process(SolrQuery solrQuery) throws 
SolrServerException, IOException, SolrException {
-    if (solrClient != null) {
-      QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
-      return queryResponse;
-    } else {
-      LOG.error("solrClient can't be null");
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
deleted file mode 100644
index 0ece637..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-public class LogConfigHandler extends Thread {
-  private static final Logger LOG = Logger.getLogger(LogConfigHandler.class);
-  
-  private static final int DEFAULT_SOLR_CONFIG_INTERVAL = 5;
-  private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
-  private static final String TIMEZONE = "GMT";
-  private static final int RETRY_INTERVAL = 30;
-
-  static {
-    TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
-  }
-  
-  private static ThreadLocal<DateFormat> formatter = new 
ThreadLocal<DateFormat>() {
-    protected DateFormat initialValue() {
-      SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-      dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE));
-      return dateFormat;
-    }
-  };
-  
-  private static boolean filterEnabled;
-  private static LogFeederFilterWrapper logFeederFilterWrapper;
-
-  private static boolean running = false;
-
-  public static void handleConfig() {
-    filterEnabled = 
LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
-    if (!filterEnabled) {
-      LOG.info("Logfeeder filter Scheduler is disabled.");
-      return;
-    }
-    if (!running) {
-      new LogConfigHandler().start();
-      running = true;
-      LOG.info("Logfeeder Filter Thread started!");
-    } else {
-      LOG.warn("Logfeeder Filter Thread is already running.");
-    }
-  }
-  
-  private LogConfigHandler() {
-    setName(getClass().getSimpleName());
-    setDaemon(true);
-  }
-
-  @Override
-  public void run() {
-    String zkConnectString = 
LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
-    String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url");
-    if (StringUtils.isBlank(zkConnectString) && StringUtils.isBlank(solrUrl)) {
-      LOG.warn("Neither Solr ZK Connect String nor solr Url for 
UserConfig/History is set." +
-          "Won't look for level configuration from Solr.");
-      return;
-    }
-    
-    int solrConfigInterval = 
LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", 
DEFAULT_SOLR_CONFIG_INTERVAL);
-    do {
-      LOG.debug("Updating config from solr after every " + solrConfigInterval 
+ " sec.");
-      fetchConfig();
-      try {
-        Thread.sleep(1000 * solrConfigInterval);
-      } catch (InterruptedException e) {
-        LOG.error(e.getLocalizedMessage(), e.getCause());
-      }
-    } while (true);
-  }
-
-  private synchronized void fetchConfig() {
-    LogConfigFetcher fetcher = LogConfigFetcher.getInstance();
-    if (fetcher != null) {
-      Map<String, Object> configDocMap = fetcher.getConfigDoc();
-      String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
-      if (configJson != null) {
-        logFeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, 
LogFeederFilterWrapper.class);
-      }
-    }
-  }
-
-  public static boolean isFilterAvailable() {
-    return logFeederFilterWrapper != null;
-  }
-
-  public static List<String> getAllowedLevels(String hostName, LogFeederFilter 
componentFilter) {
-    String componentName = componentFilter.getLabel();
-    List<String> hosts = componentFilter.getHosts();
-    List<String> defaultLevels = componentFilter.getDefaultLevels();
-    List<String> overrideLevels = componentFilter.getOverrideLevels();
-    String expiryTime = componentFilter.getExpiryTime();
-    
-    // check is user override or not
-    if (StringUtils.isNotEmpty(expiryTime) || 
CollectionUtils.isNotEmpty(overrideLevels) || 
CollectionUtils.isNotEmpty(hosts)) {
-      if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null 
consider it apply on all hosts
-        hosts.add(LogFeederConstants.ALL);
-      }
-      
-      if (LogFeederUtil.isListContains(hosts, hostName, false)) {
-        if (isFilterExpired(componentFilter)) {
-          LOG.debug("Filter for component " + componentName + " and host :" + 
hostName + " is expired at " +
-              componentFilter.getExpiryTime());
-          return defaultLevels;
-        } else {
-          return overrideLevels;
-        }
-      }
-    }
-    return defaultLevels;
-  }
-
-  private static boolean isFilterExpired(LogFeederFilter logfeederFilter) {
-    if (logfeederFilter == null)
-      return false;
-    
-    Date filterEndDate = parseFilterExpireDate(logfeederFilter);
-    if (filterEndDate == null) {
-      return false;
-    }
-    
-    Date currentDate = new Date();
-    if (!currentDate.before(filterEndDate)) {
-      LOG.debug("Filter for  Component :" + logfeederFilter.getLabel() + " and 
Hosts : [" +
-          StringUtils.join(logfeederFilter.getHosts(), ',') + "] is expired 
because of filter endTime : " +
-          formatter.get().format(filterEndDate) + " is older than currentTime 
:" + formatter.get().format(currentDate));
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private static Date parseFilterExpireDate(LogFeederFilter vLogfeederFilter) {
-    String expiryTime = vLogfeederFilter.getExpiryTime();
-    if (StringUtils.isNotEmpty(expiryTime)) {
-      try {
-        return formatter.get().parse(expiryTime);
-      } catch (ParseException e) {
-        LOG.error("Filter have invalid ExpiryTime : " + expiryTime + " for 
component :" + vLogfeederFilter.getLabel()
-          + " and hosts : [" + StringUtils.join(vLogfeederFilter.getHosts(), 
',') + "]");
-      }
-    }
-    return null;
-  }
-  
-  public static LogFeederFilter findComponentFilter(String componentName) {
-    waitForFilter();
-    
-    if (logFeederFilterWrapper != null) {
-      HashMap<String, LogFeederFilter> filter = 
logFeederFilterWrapper.getFilter();
-      if (filter != null) {
-        LogFeederFilter componentFilter = filter.get(componentName);
-        if (componentFilter != null) {
-          return componentFilter;
-        }
-      }
-    }
-    LOG.trace("Filter is not there for component :" + componentName);
-    return null;
-  }
-
-  private static void waitForFilter() {
-    if (!filterEnabled || logFeederFilterWrapper != null) {
-      return;
-    }
-    
-    while (true) {
-      try {
-        Thread.sleep(RETRY_INTERVAL * 1000);
-      } catch (InterruptedException e) {
-        LOG.error(e);
-      }
-      
-      LOG.info("Checking if config is available");
-      if (logFeederFilterWrapper != null) {
-        LOG.info("Config is available");
-        return;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
deleted file mode 100644
index 60c8ae8..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class LogFeederFilter {
-
-  private String label;
-  private List<String> hosts;
-  private List<String> defaultLevels;
-  private List<String> overrideLevels;
-  private String expiryTime;
-
-  public LogFeederFilter() {
-    hosts = new ArrayList<String>();
-    defaultLevels = new ArrayList<String>();
-    overrideLevels = new ArrayList<String>();
-  }
-
-  public String getLabel() {
-    return label;
-  }
-
-  public void setLabel(String label) {
-    this.label = label;
-  }
-
-  public List<String> getHosts() {
-    return hosts;
-  }
-
-  public void setHosts(List<String> hosts) {
-    this.hosts = hosts;
-  }
-
-  public List<String> getDefaultLevels() {
-    return defaultLevels;
-  }
-
-  public void setDefaultLevels(List<String> defaultLevels) {
-    this.defaultLevels = defaultLevels;
-  }
-
-  public List<String> getOverrideLevels() {
-    return overrideLevels;
-  }
-
-  public void setOverrideLevels(List<String> overrideLevels) {
-    this.overrideLevels = overrideLevels;
-  }
-
-  public String getExpiryTime() {
-    return expiryTime;
-  }
-
-  public void setExpiryTime(String expiryTime) {
-    this.expiryTime = expiryTime;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
deleted file mode 100644
index 9199cd3..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.HashMap;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class LogFeederFilterWrapper {
-
-  private HashMap<String, LogFeederFilter> filter;
-  private String id;
-
-  public HashMap<String, LogFeederFilter> getFilter() {
-    return filter;
-  }
-
-  public void setFilter(HashMap<String, LogFeederFilter> filter) {
-    this.filter = filter;
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public void setId(String id) {
-    this.id = id;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
new file mode 100644
index 0000000..1f635af
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.loglevelfilter;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
+public enum FilterLogData {
+  INSTANCE;
+  
+  private static final Logger LOG = Logger.getLogger(FilterLogData.class);
+  
+  private static final boolean DEFAULT_VALUE = true;
+
+  public boolean isAllowed(String jsonBlock, InputMarker inputMarker) {
+    if (StringUtils.isEmpty(jsonBlock)) {
+      return DEFAULT_VALUE;
+    }
+    Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
+    return isAllowed(jsonObj, inputMarker);
+  }
+
+  public boolean isAllowed(Map<String, Object> jsonObj, InputMarker 
inputMarker) {
+    if 
("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
+      return true;
+    
+    boolean isAllowed = applyFilter(jsonObj);
+    if (!isAllowed) {
+      LOG.trace("Filter block the content :" + 
LogFeederUtil.getGson().toJson(jsonObj));
+    }
+    return isAllowed;
+  }
+  
+
+  private boolean applyFilter(Map<String, Object> jsonObj) {
+    if (MapUtils.isEmpty(jsonObj)) {
+      LOG.warn("Output jsonobj is empty");
+      return DEFAULT_VALUE;
+    }
+    
+    String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
+    String logId = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
+    String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
+    if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(logId) && 
StringUtils.isNotBlank(level)) {
+      return LogLevelFilterHandler.isAllowed(hostName, logId, level);
+    } else {
+      return DEFAULT_VALUE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
new file mode 100644
index 0000000..8a4d953
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.loglevelfilter;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+public class LogLevelFilterHandler implements LogLevelFilterMonitor {
+  private static final Logger LOG = 
Logger.getLogger(LogLevelFilterHandler.class);
+  
+  private static final String TIMEZONE = "GMT";
+  private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
+  
+  private static ThreadLocal<DateFormat> formatter = new 
ThreadLocal<DateFormat>() {
+    protected DateFormat initialValue() {
+      SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+      dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE));
+      return dateFormat;
+    }
+  };
+  
+  private static LogSearchConfig config;
+  private static String clusterName = 
LogFeederUtil.getStringProperty("cluster.name");
+  private static boolean filterEnabled;
+  private static List<String> defaultLogLevels;
+  private static Map<String, LogLevelFilter> filters = new HashMap<>();
+
+  public static void init(LogSearchConfig config_) {
+    config = config_;
+    filterEnabled = 
LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
+    defaultLogLevels = 
Arrays.asList(LogFeederUtil.getStringProperty("logfeeder.include.default.level").split(","));
+    TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
+  }
+
+  @Override
+  public void setLogLevelFilter(String logId, LogLevelFilter logLevelFilter) {
+    synchronized (LogLevelFilterHandler.class) {
+      filters.put(logId, logLevelFilter);
+    }
+  }
+
+  @Override
+  public void removeLogLevelFilter(String logId) {
+    synchronized (LogLevelFilterHandler.class) {
+      filters.remove(logId);
+    }
+  }
+
+  public static boolean isAllowed(String hostName, String logId, String level) 
{
+    if (!filterEnabled) {
+      return true;
+    }
+    
+    LogLevelFilter logFilter = findLogFilter(logId);
+    List<String> allowedLevels = getAllowedLevels(hostName, logFilter);
+    return allowedLevels.isEmpty() || allowedLevels.contains(level);
+  }
+
+  private static synchronized LogLevelFilter findLogFilter(String logId) {
+    LogLevelFilter logFilter = filters.get(logId);
+    if (logFilter != null) {
+      return logFilter;
+    }
+
+    LOG.info("Filter is not present for log " + logId + ", creating default 
filter");
+    LogLevelFilter defaultFilter = new LogLevelFilter();
+    defaultFilter.setLabel(logId);
+    defaultFilter.setDefaultLevels(defaultLogLevels);
+
+    try {
+      config.createLogLevelFilter(clusterName, logId, defaultFilter);
+      filters.put(logId, defaultFilter);
+    } catch (Exception e) {
+      LOG.warn("Could not persist the default filter for log " + logId, e);
+    }
+
+    return defaultFilter;
+  }
+
+  private static List<String> getAllowedLevels(String hostName, LogLevelFilter 
componentFilter) {
+    String componentName = componentFilter.getLabel();
+    List<String> hosts = componentFilter.getHosts();
+    List<String> defaultLevels = componentFilter.getDefaultLevels();
+    List<String> overrideLevels = componentFilter.getOverrideLevels();
+    Date expiryTime = componentFilter.getExpiryTime();
+
+    // check is user override or not
+    if (expiryTime != null || CollectionUtils.isNotEmpty(overrideLevels) || 
CollectionUtils.isNotEmpty(hosts)) {
+      if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null 
consider it apply on all hosts
+        hosts.add(LogFeederConstants.ALL);
+      }
+
+      if (hosts.isEmpty() || hosts.contains(hostName)) {
+        if (isFilterExpired(componentFilter)) {
+          LOG.debug("Filter for component " + componentName + " and host :" + 
hostName + " is expired at " +
+              componentFilter.getExpiryTime());
+          return defaultLevels;
+        } else {
+          return overrideLevels;
+        }
+      }
+    }
+    return defaultLevels;
+  }
+
+  private static boolean isFilterExpired(LogLevelFilter logLevelFilter) {
+    if (logLevelFilter == null)
+      return false;
+
+    Date filterEndDate = logLevelFilter.getExpiryTime();
+    if (filterEndDate == null) {
+      return false;
+    }
+
+    Date currentDate = new Date();
+    if (!currentDate.before(filterEndDate)) {
+      LOG.debug("Filter for  Component :" + logLevelFilter.getLabel() + " and 
Hosts : [" +
+          StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired 
because of filter endTime : " +
+          formatter.get().format(filterEndDate) + " is older than currentTime 
:" + formatter.get().format(currentDate));
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index 135fb32..ba872f8 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -29,7 +29,7 @@ import java.util.UUID;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.FilterLogData;
+import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.MurmurHash;

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index bb2f0a9..1929178 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.ambari.logfeeder.LogFeeder;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -311,24 +310,6 @@ public class LogFeederUtil {
       return false;
     }
   }
-
-  public static boolean isListContains(List<String> list, String str, boolean 
caseSensitive) {
-    if (list == null) {
-      return false;
-    }
-    
-    for (String value : list) {
-      if (value == null) {
-        continue;
-      }
-      
-      if (caseSensitive ? value.equals(str) : value.equalsIgnoreCase(str) ||
-          value.equalsIgnoreCase(LogFeederConstants.ALL)) {
-        return true;
-      }
-    }
-    return false;
-  }
   
   private static String logfeederTempDir = null;
   

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
index 266108f..44314c6 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.ambari.logfeeder.logconfig;
 
-import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,15 +30,17 @@ import static org.junit.Assert.*;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.junit.AfterClass;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.commons.lang.time.DateUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class LogConfigHandlerTest {
   
-  private static LogConfigFetcher mockFetcher;
-  
   private static InputMarker inputMarkerAudit;
   private static InputMarker inputMarkerService;
   static {
@@ -56,47 +59,41 @@ public class LogConfigHandlerTest {
     replay(auditInput, serviceInput);
   }
   
-  private static final Map<String, Object> CONFIG_MAP = new HashMap<>();
-  static {
-    CONFIG_MAP.put("jsons",
-        "{'filter':{" +
-          "'configured_log_file':{" +
-            "'label':'configured_log_file'," +
-            "'hosts':[]," +
-            "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
-            "'overrideLevels':[]}," +
-          "'configured_log_file2':{" +
-            "'label':'configured_log_file2'," +
-            "'hosts':['host1']," +
-            "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
-            
"'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," +
-            "'expiryTime':'3000-01-01T00:00:00.000Z'}," +
-          "'configured_log_file3':{" +
-            "'label':'configured_log_file3'," +
-            "'hosts':['host1']," +
-            "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
-            
"'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," +
-            "'expiryTime':'1000-01-01T00:00:00.000Z'}" +
-          "}}");
-  }
-  
   @BeforeClass
   public static void init() throws Exception {
-    mockFetcher = strictMock(LogConfigFetcher.class);
-    Field f = LogConfigFetcher.class.getDeclaredField("instance");
-    f.setAccessible(true);
-    f.set(null, mockFetcher);
-    expect(mockFetcher.getConfigDoc()).andReturn(CONFIG_MAP).anyTimes();
-    replay(mockFetcher);
-    
     LogFeederUtil.loadProperties("logfeeder.properties", null);
-    LogConfigHandler.handleConfig();
-    Thread.sleep(1000);
+    
+    LogSearchConfig config = strictMock(LogSearchConfig.class);
+    config.createLogLevelFilter(anyString(), anyString(), 
anyObject(LogLevelFilter.class));
+    expectLastCall().anyTimes();
+    LogLevelFilterHandler.init(config);
+    
+    LogLevelFilter logLevelFilter1 = new LogLevelFilter();
+    logLevelFilter1.setHosts(Collections.<String> emptyList());
+    logLevelFilter1.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", 
"INFO"));
+    logLevelFilter1.setOverrideLevels(Collections.<String> emptyList());
+    
+    LogLevelFilter logLevelFilter2 = new LogLevelFilter();
+    logLevelFilter2.setHosts(Arrays.asList("host1"));
+    logLevelFilter2.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", 
"INFO"));
+    logLevelFilter2.setOverrideLevels(Arrays.asList("FATAL", "ERROR", "WARN", 
"INFO", "DEBUG", "TRACE"));
+    logLevelFilter2.setExpiryTime(DateUtils.addDays(new Date(), 1));
+    
+    LogLevelFilter logLevelFilter3 = new LogLevelFilter();
+    logLevelFilter3.setHosts(Arrays.asList("host1"));
+    logLevelFilter3.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", 
"INFO"));
+    logLevelFilter3.setOverrideLevels(Arrays.asList("FATAL", "ERROR", "WARN", 
"INFO", "DEBUG", "TRACE"));
+    logLevelFilter3.setExpiryTime(DateUtils.addDays(new Date(), -1));
+    
+    LogLevelFilterHandler h = new LogLevelFilterHandler();
+    h.setLogLevelFilter("configured_log_file1", logLevelFilter1);
+    h.setLogLevelFilter("configured_log_file2", logLevelFilter2);
+    h.setLogLevelFilter("configured_log_file3", logLevelFilter3);
   }
   
   @Test
   public void testLogConfigHandler_auditAllowed() throws Exception {
-    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file', 'level':'DEBUG'}",
+    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file1', 'level':'DEBUG'}",
         inputMarkerAudit));
   }
   
@@ -109,19 +106,25 @@ public class LogConfigHandlerTest {
   
   @Test
   public void testLogConfigHandler_notConfiguredLogAllowed() throws Exception {
-    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'not_configured_log_file', 'level':'INFO'}",
+    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'not_configured_log_file1', 'level':'WARN'}",
+        inputMarkerService));
+  }
+  
+  @Test
+  public void testLogConfigHandler_notConfiguredLogNotAllowed() throws 
Exception {
+    assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'not_configured_log_file1', 'level':'TRACE'}",
         inputMarkerService));
   }
   
   @Test
   public void testLogConfigHandler_configuredDataAllow() throws Exception {
-    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file', 'level':'INFO'}",
+    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file1', 'level':'INFO'}",
         inputMarkerService));
   }
   
   @Test
   public void testLogConfigHandler_configuredDataDontAllow() throws Exception {
-    assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file', 'level':'DEBUG'}",
+    assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file1', 'level':'DEBUG'}",
         inputMarkerService));
   }
   
@@ -142,9 +145,4 @@ public class LogConfigHandlerTest {
     assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 
'type':'configured_log_file3', 'level':'DEBUG'}",
         inputMarkerService));
   }
-  
-  @AfterClass
-  public static void finish() {
-    verify(mockFetcher);
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
index 59020cc..19027d1 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
@@ -17,4 +17,5 @@ logfeeder.log.filter.enable=true
 logfeeder.solr.config.interval=5
 logfeeder.solr.zk_connect_string=some_connect_string
 logfeeder.metrics.collector.hosts=some_collector_host
-node.hostname=test_host_name
\ No newline at end of file
+node.hostname=test_host_name
+logfeeder.include.default.level=FATAL,ERROR,WARN
\ No newline at end of file

Reply via email to