This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch cloudbreak
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git

commit 9f6b8053242bbd6c2c978b3ecfda149233a604e1
Author: Olivér Szabó <oleew...@gmail.com>
AuthorDate: Thu Nov 1 13:33:30 2018 +0100

    AMBARI-24833. Create cloud input/output skeleton. (#17)
    
    * AMBARI-24833. Create cloud input/output skeleton.
    
    * AMBARI-24833. Use LogFeederMode as enum by spring
    
    * AMBARI-24833. Fix review issues + add secret stores.
---
 .../ambari/logfeeder/plugin/input/Input.java       |  14 +
 ambari-logsearch-logfeeder/pom.xml                 |   6 +-
 .../logfeeder/common/LogEntryParseTester.java      |  11 +-
 .../logfeeder/common/LogFeederConstants.java       |   6 +-
 .../ambari/logfeeder/conf/ApplicationConfig.java   | 136 ++++++--
 .../ambari/logfeeder/conf/LogFeederMode.java       |  58 +++
 .../ambari/logfeeder/conf/LogFeederProps.java      |  18 +
 .../logfeeder/conf/LogFeederSecurityConfig.java    |  53 +--
 .../conf/condition/CloudStorageCondition.java      |  37 ++
 .../conf/condition/NonCloudStorageCondition.java   |  37 ++
 .../logfeeder/credential/CompositeSecretStore.java |  39 +++
 .../logfeeder/credential/EnvSecretStore.java       |  37 ++
 .../logfeeder/credential/FileSecretStore.java      |  60 ++++
 .../credential/HadoopCredentialSecretStore.java    |  52 +++
 .../logfeeder/credential/PropertySecretStore.java  |  36 ++
 .../ambari/logfeeder/credential/SecretStore.java   |  30 ++
 .../ambari/logfeeder/filter/FilterDummy.java       |  58 +++
 .../logfeeder/input/InputConfigUploader.java       |  26 +-
 .../apache/ambari/logfeeder/input/InputFile.java   |  16 +-
 .../ambari/logfeeder/input/InputManagerImpl.java   |  12 +-
 .../apache/ambari/logfeeder/input/InputSocket.java |   2 +-
 .../ambari/logfeeder/manager/BlockMerger.java      |  66 ++++
 .../logfeeder/manager/InputConfigHolder.java       |  80 +++++
 .../InputConfigManager.java}                       | 388 ++++++---------------
 .../manager/operations/InputConfigHandler.java     |  53 +++
 .../impl/CloudStorageInputConfigHandler.java       | 101 ++++++
 .../operations/impl/DefaultInputConfigHandler.java | 166 +++++++++
 .../ambari/logfeeder/metrics/StatsLogger.java      |  16 +-
 .../ambari/logfeeder/output/OutputManagerImpl.java |   1 +
 .../output/cloud/CloudStorageFactory.java          |  32 ++
 .../logfeeder/output/cloud/CloudStorageOutput.java |  30 ++
 .../output/cloud/CloudStorageOutputManager.java    | 102 ++++++
 .../ambari/logfeeder/output/cloud/HDFSOutput.java  |  74 ++++
 .../src/main/resources/logfeeder.properties        |   3 +
 .../ambari/logfeeder/output/OutputS3FileTest.java  |   3 -
 .../ambari/logfeeder/output/S3UploaderTest.java    |   4 -
 .../logfeeder/output/spool/LogSpoolerTest.java     |   8 -
 .../org/apache/ambari/logsearch/LogSearch.java     |   3 +-
 docker/test-config/logfeeder/logfeeder.properties  |   3 +-
 39 files changed, 1471 insertions(+), 406 deletions(-)

diff --git 
a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
 
b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 6228637..9ee4533 100644
--- 
a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ 
b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -71,6 +71,7 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
   private LRUCache cache;
   private String cacheKeyField;
   private boolean initDefaultFields;
+  private boolean cloudInput = false;
   private MetricData readBytesMetric = new 
MetricData(getReadBytesMetricName(), false);
 
   /**
@@ -400,4 +401,17 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
   public void setInitDefaultFields(boolean initDefaultFields) {
     this.initDefaultFields = initDefaultFields;
   }
+
+  public boolean isCloudInput() {
+    return cloudInput;
+  }
+
+  public void setCloudInput(boolean cloudInput) {
+    this.cloudInput = cloudInput;
+  }
+
+  public String getCloudModeSuffix() {
+    String mode = isCloudInput() ? "cloud": "default";
+    return "mode=" + mode;
+  }
 }
diff --git a/ambari-logsearch-logfeeder/pom.xml 
b/ambari-logsearch-logfeeder/pom.xml
index 94af44f..71cf853 100644
--- a/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch-logfeeder/pom.xml
@@ -33,8 +33,8 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spring.version>5.1.1.RELEASE</spring.version>
-    <spring-boot.version>2.0.6.RELEASE</spring-boot.version>
+    <spring.version>5.1.2.RELEASE</spring.version>
+    <spring-boot.version>2.1.0.RELEASE</spring-boot.version>
   </properties>
 
   <dependencies>
@@ -96,7 +96,7 @@
     <dependency>
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>
-      <version>3.6</version>
+      <version>4.0.1</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index b4a2a26..c4b9835 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -27,9 +27,11 @@ import java.util.Map;
 
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import 
org.apache.ambari.logfeeder.manager.operations.impl.DefaultInputConfigHandler;
 import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.input.InputManagerImpl;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
 import org.apache.ambari.logfeeder.output.OutputManagerImpl;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -89,8 +91,6 @@ public class LogEntryParseTester {
   @SuppressWarnings("unchecked")
   public Map<String, Object> parse() throws Exception {
     InputConfig inputConfig = getInputConfig();
-    ConfigHandler configHandler = new ConfigHandler(null);
-    configHandler.setInputManager(new InputManagerImpl());
     OutputManagerImpl outputManager = new OutputManagerImpl();
     LogFeederProps logFeederProps = new LogFeederProps();
     LogEntryCacheConfig logEntryCacheConfig = new LogEntryCacheConfig();
@@ -101,8 +101,11 @@ public class LogEntryParseTester {
     LogLevelFilterHandler logLevelFilterHandler = new 
LogLevelFilterHandler(null);
     logLevelFilterHandler.setLogFeederProps(logFeederProps);
     outputManager.setLogLevelFilterHandler(logLevelFilterHandler);
-    configHandler.setOutputManager(outputManager);
-    Input input = configHandler.getTestInput(inputConfig, logId);
+    DefaultInputConfigHandler configHandler = new DefaultInputConfigHandler();
+    InputConfigManager inputConfigManager = new InputConfigManager(
+      null, new InputManagerImpl(), outputManager, 
configHandler,logFeederProps, true
+    );
+    Input input = inputConfigManager.getTestInput(inputConfig, logId);
     input.init(logFeederProps);
     final Map<String, Object> result = new HashMap<>();
     input.getFirstFilter().init(logFeederProps);
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 1d56924..a9790b2 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -23,7 +23,9 @@ public class LogFeederConstants {
   public static final String ALL = "all";
   public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
   public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN";
-  
+
+  public static final String CLOUD_PREFIX = "cl-";
+
   // solr fields
   public static final String SOLR_LEVEL = "level";
   public static final String SOLR_COMPONENT = "type";
@@ -107,4 +109,6 @@ public class LogFeederConstants {
   public static final String SOLR_ZK_CONNECTION_STRING = 
"logfeeder.solr.zk_connect_string";
   public static final String SOLR_URLS = "logfeeder.solr.urls";
 
+  public static final String CLOUD_STORAGE_MODE = 
"logfeeder.cloud.storage.mode";
+
 }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index 086ad70..881b856 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,20 @@ package org.apache.ambari.logfeeder.conf;
 
 import com.google.common.collect.Maps;
 import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory;
+import org.apache.ambari.logfeeder.conf.condition.CloudStorageCondition;
+import org.apache.ambari.logfeeder.conf.condition.NonCloudStorageCondition;
 import org.apache.ambari.logfeeder.container.docker.DockerContainerRegistry;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import 
org.apache.ambari.logfeeder.manager.operations.impl.CloudStorageInputConfigHandler;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.input.InputManagerImpl;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
+import org.apache.ambari.logfeeder.output.cloud.CloudStorageOutputManager;
 import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
 import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
+import 
org.apache.ambari.logfeeder.manager.operations.impl.DefaultInputConfigHandler;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.metrics.StatsLogger;
 import org.apache.ambari.logfeeder.output.OutputManagerImpl;
@@ -44,6 +50,7 @@ import 
org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
 import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
 import org.apache.solr.client.solrj.SolrClient;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.context.annotation.PropertySource;
@@ -72,13 +79,42 @@ public class ApplicationConfig {
   }
 
   @Bean
+  public CheckpointManager checkpointHandler() {
+    return new FileCheckpointManager();
+  }
+
+  @Bean
+  public DockerContainerRegistry containerRegistry() {
+    if (logFeederProps.isDockerContainerRegistryEnabled()) {
+      return 
DockerContainerRegistry.getInstance(logFeederProps.getProperties());
+    } else {
+      return null;
+    }
+  }
+
+  @Bean
+  public MetricsManager metricsManager() {
+    return new MetricsManager();
+  }
+
+  // Non-cloud configurations
+
+  @Bean
+  @Conditional(NonCloudStorageCondition.class)
+  public StatsLogger statsLogger() throws Exception {
+    return new StatsLogger("statsLogger", inputConfigManager());
+  }
+
+  @Bean
   @DependsOn({"logSearchConfigLogFeeder", "propertyConfigurer"})
-  public ConfigHandler configHandler() throws Exception {
-    return new ConfigHandler(logSearchConfigLogFeeder());
+  @Conditional(NonCloudStorageCondition.class)
+  public DefaultInputConfigHandler inputConfigHandler() throws Exception {
+    return new DefaultInputConfigHandler();
   }
 
   @Bean
   @DependsOn("logFeederSecurityConfig")
+  @Conditional(NonCloudStorageCondition.class)
   public LogSearchConfigLogFeeder logSearchConfigLogFeeder() throws Exception {
     if (logFeederProps.isUseLocalConfigs()) {
       LogSearchConfigLogFeeder logfeederConfig = 
LogSearchConfigFactory.createLogSearchConfigLogFeeder(
@@ -96,6 +132,7 @@ public class ApplicationConfig {
   }
 
   @Bean
+  @Conditional(NonCloudStorageCondition.class)
   public LogLevelFilterManager logLevelFilterManager() throws Exception {
     if (logFeederProps.isSolrFilterStorage()) {
       SolrClient solrClient = new 
LogFeederSolrClientFactory().createSolrClient(
@@ -107,13 +144,14 @@ public class ApplicationConfig {
         map.put(name, logFeederProps.getProperties().getProperty(name));
       }
       return new LogLevelFilterManagerZK(map);
-    } else { // no default filter manager
+    } else {
       return null;
     }
   }
 
   @Bean
   @DependsOn("logLevelFilterHandler")
+  @Conditional(NonCloudStorageCondition.class)
   public LogLevelFilterUpdater logLevelFilterUpdater() throws Exception {
     if (logFeederProps.isSolrFilterStorage() && 
logFeederProps.isSolrFilterMonitor()) {
       LogLevelFilterUpdater logLevelFilterUpdater = new 
LogLevelFilterUpdaterSolr(
@@ -121,56 +159,92 @@ public class ApplicationConfig {
         30, (LogLevelFilterManagerSolr) logLevelFilterManager(), 
logFeederProps.getClusterName());
       logLevelFilterUpdater.start();
       return logLevelFilterUpdater;
-    } else { // no default filter updater
-      return null;
     }
-  }
-  @Bean
-  public MetricsManager metricsManager() {
-    return new MetricsManager();
+    return null;
   }
 
   @Bean
-  @DependsOn("configHandler")
+  @Conditional(NonCloudStorageCondition.class)
   public LogLevelFilterHandler logLevelFilterHandler() throws Exception {
     return new LogLevelFilterHandler(logSearchConfigLogFeeder());
   }
 
   @Bean
-  @DependsOn({"configHandler", "logSearchConfigLogFeeder", 
"logLevelFilterHandler"})
-  public InputConfigUploader inputConfigUploader() {
-    return new InputConfigUploader();
+  @Conditional(NonCloudStorageCondition.class)
+  @DependsOn({"inputConfigHandler"})
+  public InputConfigUploader inputConfigUploader() throws Exception {
+    return new InputConfigUploader("Input Config Loader", 
logSearchConfigLogFeeder(),
+      inputConfigManager(), logLevelFilterHandler());
+  }
+
+  @Bean
+  @DependsOn({"containerRegistry", "checkpointHandler"})
+  @Conditional(NonCloudStorageCondition.class)
+  public InputManager inputManager() {
+    return new InputManagerImpl("InputIsNotReadyMonitor");
+  }
+
+  @Bean
+  @Conditional(NonCloudStorageCondition.class)
+  public OutputManager outputManager() throws Exception {
+    return new OutputManagerImpl();
   }
 
   @Bean
-  @DependsOn("inputConfigUploader")
-  public StatsLogger statsLogger() {
-    return new StatsLogger();
+  @Conditional(NonCloudStorageCondition.class)
+  public InputConfigManager inputConfigManager() throws Exception {
+    return new InputConfigManager(logSearchConfigLogFeeder(), inputManager(), 
outputManager(),
+      inputConfigHandler(), logFeederProps, true);
   }
 
+  // Cloud configurations
+
+  @Bean(name = "cloudLogSearchLogFeederConfig")
+  @Conditional(CloudStorageCondition.class)
+  public LogSearchConfigLogFeeder cloudLogSearchLogFeederConfig() throws 
Exception {
+    return LogSearchConfigFactory.createLogSearchConfigLogFeeder(
+      Maps.fromProperties(logFeederProps.getProperties()),
+      logFeederProps.getClusterName(),
+      LogSearchConfigLogFeederLocal.class, false);
+  }
+
+  @Bean
+  @Conditional(CloudStorageCondition.class)
+  @DependsOn({"cloudInputConfigHandler"})
+  public InputConfigUploader cloudInputConfigUploader() throws Exception {
+    return new InputConfigUploader("Cloud Input Config Loader", 
cloudLogSearchLogFeederConfig(),
+      cloudInputConfigManager(),null);
+  }
 
   @Bean
   @DependsOn({"containerRegistry", "checkpointHandler"})
-  public InputManager inputManager() {
-    return new InputManagerImpl();
+  @Conditional(CloudStorageCondition.class)
+  public InputManager cloudInputManager() {
+    return new InputManagerImpl("CloudInputIsNotReady");
   }
 
   @Bean
-  public OutputManager outputManager() {
-    return new OutputManagerImpl();
+  @Conditional(CloudStorageCondition.class)
+  public OutputManager cloudOutputManager() throws Exception {
+    return new CloudStorageOutputManager();
   }
 
   @Bean
-  public CheckpointManager checkpointHandler() {
-    return new FileCheckpointManager();
+  @Conditional(CloudStorageCondition.class)
+  public InputConfigHandler cloudInputConfigHandler() {
+    return new CloudStorageInputConfigHandler();
   }
 
   @Bean
-  public DockerContainerRegistry containerRegistry() {
-    if (logFeederProps.isDockerContainerRegistryEnabled()) {
-      return 
DockerContainerRegistry.getInstance(logFeederProps.getProperties());
-    } else {
-      return null;
-    }
+  @Conditional(CloudStorageCondition.class)
+  public InputConfigManager cloudInputConfigManager() throws Exception {
+    return new InputConfigManager(cloudLogSearchLogFeederConfig(), 
cloudInputManager(), cloudOutputManager(),
+      cloudInputConfigHandler(), logFeederProps, false);
+  }
+
+  @Bean
+  @Conditional(CloudStorageCondition.class)
+  public StatsLogger cloudStatsLogger() throws Exception {
+    return new StatsLogger("cloudStatsLogger", cloudInputConfigManager());
   }
 }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java
new file mode 100644
index 0000000..329f066
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+/**
+ * Global Log Feeder modes:
+ * <pre>
+ * - default: process logs based on input / filter / output JSON configurations
+ * - cloud: process logs based on input JSON configurations and send them 
directly into cloud storage (without filters)
+ * - hybrid: use both 2 above (together)
+ * </pre>
+ */
+public enum LogFeederMode {
+  DEFAULT("default"), CLOUD("cloud"), HYBRID("hybrid");
+
+  private String text;
+
+  LogFeederMode(String text) {
+    this.text = text;
+  }
+
+  public String getText() {
+    return this.text;
+  }
+
+  public static LogFeederMode fromString(String text) {
+    for (LogFeederMode mode : LogFeederMode.values()) {
+      if (mode.text.equalsIgnoreCase(text)) {
+        return mode;
+      }
+    }
+    throw new IllegalArgumentException(String.format("String '%s' cannot be 
converted to LogFeederMode enum", text));
+  }
+
+  public static boolean isCloudStorage(LogFeederMode mode) {
+    return LogFeederMode.HYBRID.equals(mode) || 
LogFeederMode.CLOUD.equals(mode);
+  }
+
+  public static boolean isNonCloudStorage(LogFeederMode mode) {
+    return LogFeederMode.HYBRID.equals(mode) || 
LogFeederMode.DEFAULT.equals(mode);
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 859de8f..dc1bfd2 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -199,6 +199,16 @@ public class LogFeederProps implements LogFeederProperties 
{
   @Value("${" + LogFeederConstants.SOLR_URLS + ":}")
   private String solrUrlsStr;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_MODE,
+    description = "Option to support sending logs to cloud storage. You can 
choose between supporting only cloud storage, non-cloud storage or both",
+    examples = {"default", "cloud", "hybrid"},
+    defaultValue = "default",
+    sources = {LogFeederConstants.CLOUD_STORAGE_MODE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_MODE + ":default}")
+  public LogFeederMode cloudStorageMode;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -352,6 +362,14 @@ public class LogFeederProps implements LogFeederProperties 
{
     this.zkFilterStorage = zkFilterStorage;
   }
 
+  public LogFeederMode getCloudStorageMode() {
+    return cloudStorageMode;
+  }
+
+  public void setCloudStorageMode(LogFeederMode cloudStorageMode) {
+    this.cloudStorageMode = cloudStorageMode;
+  }
+
   public String[] getSolrUrls() {
     if (StringUtils.isNotBlank(this.solrUrlsStr)) {
       return this.solrUrlsStr.split(",");
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
index aca1109..e047f60 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
@@ -19,17 +19,18 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.credential.CompositeSecretStore;
+import org.apache.ambari.logfeeder.credential.FileSecretStore;
+import org.apache.ambari.logfeeder.credential.HadoopCredentialSecretStore;
+import org.apache.ambari.logfeeder.credential.SecretStore;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.beans.factory.annotation.Value;
 
 import javax.annotation.PostConstruct;
 import java.io.File;
-import java.nio.charset.Charset;
 
 public class LogFeederSecurityConfig {
 
@@ -142,48 +143,12 @@ public class LogFeederSecurityConfig {
   }
 
   private String getPassword(String propertyName, String fileName) {
-    String credentialStorePassword = 
getPasswordFromCredentialStore(propertyName);
-    if (credentialStorePassword != null) {
-      return credentialStorePassword;
-    }
-
-    String filePassword = getPasswordFromFile(fileName);
-    if (filePassword != null) {
-      return filePassword;
-    }
-
-    return LOGFEEDER_STORE_DEFAULT_PASSWORD;
-  }
+    SecretStore hadoopSecretStore = new 
HadoopCredentialSecretStore(propertyName, credentialStoreProviderPath);
+    SecretStore fileSecretStore = new 
FileSecretStore(String.join(File.separator, LOGFEEDER_CERT_DEFAULT_FOLDER, 
fileName), LOGFEEDER_STORE_DEFAULT_PASSWORD);
+    SecretStore compositeSecretStore = new 
CompositeSecretStore(hadoopSecretStore, fileSecretStore);
 
-  private String getPasswordFromCredentialStore(String propertyName) {
-    try {
-      if (StringUtils.isEmpty(credentialStoreProviderPath)) {
-        return null;
-      }
-
-      org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
-      config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, 
credentialStoreProviderPath);
-      char[] passwordChars = config.getPassword(propertyName);
-      return (ArrayUtils.isNotEmpty(passwordChars)) ? new 
String(passwordChars) : null;
-    } catch (Exception e) {
-      logger.warn(String.format("Could not load password %s from credential 
store, using default password", propertyName));
-      return null;
-    }
-  }
-
-  private String getPasswordFromFile(String fileName) {
-    try {
-      File pwdFile = new File(LOGFEEDER_CERT_DEFAULT_FOLDER, fileName);
-      if (!pwdFile.exists()) {
-        FileUtils.writeStringToFile(pwdFile, LOGFEEDER_STORE_DEFAULT_PASSWORD, 
Charset.defaultCharset());
-        return LOGFEEDER_STORE_DEFAULT_PASSWORD;
-      } else {
-        return FileUtils.readFileToString(pwdFile, Charset.defaultCharset());
-      }
-    } catch (Exception e) {
-      logger.warn("Exception occurred during read/write password file for 
keystore/truststore.", e);
-      return null;
-    }
+    char[] password = compositeSecretStore.getSecret();
+    return password == null ? LOGFEEDER_STORE_DEFAULT_PASSWORD: new 
String(password);
   }
 
 }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java
new file mode 100644
index 0000000..3860699
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.condition;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederMode;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+/**
+ * Global condition that checks is the application started in cloud or hybrid 
mode
+ */
+public class CloudStorageCondition implements Condition {
+
+  @Override
+  public boolean matches(ConditionContext context, AnnotatedTypeMetadata 
metadata) {
+    return LogFeederMode.isCloudStorage(LogFeederMode.fromString(
+      
context.getEnvironment().getProperty(LogFeederConstants.CLOUD_STORAGE_MODE)));
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java
new file mode 100644
index 0000000..fee0efa
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.condition;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederMode;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+/**
+ * Global condition that checks is the application started in default or 
hybrid mode
+ */
+public class NonCloudStorageCondition implements Condition {
+
+  @Override
+  public boolean matches(ConditionContext context, AnnotatedTypeMetadata 
metadata) {
+    return LogFeederMode.isNonCloudStorage(LogFeederMode.fromString(
+      
context.getEnvironment().getProperty(LogFeederConstants.CLOUD_STORAGE_MODE)));
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java
new file mode 100644
index 0000000..7edaf26
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+public class CompositeSecretStore implements SecretStore {
+
+  private SecretStore[] secretStores;
+
+  public CompositeSecretStore(SecretStore... secretStores) {
+    this.secretStores = secretStores;
+  }
+
+  @Override
+  public char[] getSecret() {
+    for (SecretStore secretStore : secretStores) {
+      char[] secret = secretStore.getSecret();
+      if (secret != null) {
+        return secret;
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java
new file mode 100644
index 0000000..5d82ee1
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+public class EnvSecretStore implements SecretStore {
+
+  private final String property;
+
+  public EnvSecretStore(String property) {
+    this.property = property;
+  }
+
+  @Override
+  public char[] getSecret() {
+    String envValue = System.getenv(property);
+    if (envValue != null) {
+      return envValue.toCharArray();
+    }
+    return null;
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java
new file mode 100644
index 0000000..b9687e0
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.nio.charset.Charset;
+
+public class FileSecretStore implements SecretStore {
+
+  private static final Logger logger = 
LogManager.getLogger(FileSecretStore.class);
+
+  private final String fileLocation;
+  private final String defaultSecret;
+
+  public FileSecretStore(String fileLocation, String defaultSecret) {
+    this.fileLocation = fileLocation;
+    this.defaultSecret = defaultSecret;
+  }
+
+  public FileSecretStore(String fileLocation) {
+    this.fileLocation = fileLocation;
+    this.defaultSecret = null;
+  }
+
+  @Override
+  public char[] getSecret() {
+    try {
+      File pwdFile = new File(fileLocation);
+      if (!pwdFile.exists() && defaultSecret != null) {
+        FileUtils.writeStringToFile(pwdFile, defaultSecret, 
Charset.defaultCharset());
+        return defaultSecret.toCharArray();
+      } else {
+        return FileUtils.readFileToString(pwdFile, 
Charset.defaultCharset()).toCharArray();
+      }
+    } catch (Exception e) {
+      logger.warn("Exception occurred during read/write password file.", e);
+      return null;
+    }
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java
new file mode 100644
index 0000000..7e1237e
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HadoopCredentialSecretStore implements SecretStore {
+
+  private static final Logger logger = 
LogManager.getLogger(HadoopCredentialSecretStore.class);
+  private static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = 
"hadoop.security.credential.provider.path";
+
+  private final String credentialStoreProviderPath;
+  private final String property;
+
+  public HadoopCredentialSecretStore(String property, String 
credentialStoreProviderPath) {
+    this.property = property;
+    this.credentialStoreProviderPath = credentialStoreProviderPath;
+  }
+
+  @Override
+  public char[] getSecret() {
+    try {
+      if (StringUtils.isBlank(credentialStoreProviderPath)) {
+        return null;
+      }
+      org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
+      config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, 
credentialStoreProviderPath);
+      return config.getPassword(property);
+    } catch (Exception e) {
+      logger.warn("Could not load password {} from credential store.", 
property);
+      return null;
+    }
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java
new file mode 100644
index 0000000..046fe7c
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+public class PropertySecretStore implements SecretStore {
+  private final String property;
+
+  public PropertySecretStore(String property) {
+    this.property = property;
+  }
+
+  @Override
+  public char[] getSecret() {
+    String propValue = System.getProperty(property);
+    if (propValue != null) {
+      return propValue.toCharArray();
+    }
+    return null;
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java
new file mode 100644
index 0000000..c257ff4
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+/**
+ * Store secrets in character array
+ */
+public interface SecretStore {
+  /**
+   * Gather a secret - implement the way
+   * @return secret character array
+   */
+  char[] getSecret();
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java
new file mode 100644
index 0000000..e1946f8
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.filter;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
+
+/**
+ * Simple dummy filter, not supported by config api, create it manually
+ */
+public class FilterDummy extends Filter<LogFeederProps> {
+
+  private boolean dockerEnabled = false;
+
+  @Override
+  public void init(LogFeederProps logFeederProps) throws Exception {
+    if (logFeederProps.isDockerContainerRegistryEnabled()) {
+      Input input = getInput();
+      if (input instanceof InputFile) {
+        dockerEnabled = 
BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) 
input.getInputDescriptor()).getDockerEnabled(), false);
+      }
+    }
+  }
+
+  @Override
+  public void apply(String inputStr, InputMarker inputMarker) throws Exception 
{
+    if (dockerEnabled) {
+      inputStr = DockerLogFilter.getLogFromDockerJson(inputStr);
+    }
+    super.apply(inputStr, inputMarker);
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "filter:filter=dummy";
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index 57f5b3d..283273a 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -20,8 +20,8 @@ package org.apache.ambari.logfeeder.input;
 
 import com.google.common.io.Files;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -50,19 +50,17 @@ public class InputConfigUploader extends Thread {
   private final Pattern serviceNamePattern = 
Pattern.compile("input.config-(.+).json");
 
   @Inject
-  private LogSearchConfigLogFeeder config;
-
-  @Inject
   private LogFeederProps logFeederProps;
 
-  @Inject
-  private LogLevelFilterHandler logLevelFilterHandler;
-
-  @Inject
-  private ConfigHandler configHandler;
+  private final InputConfigManager inputConfigManager;
+  private final LogSearchConfigLogFeeder config;
+  private final LogLevelFilterHandler logLevelFilterHandler;
 
-  public InputConfigUploader() {
-    super("Input Config Loader");
+  public InputConfigUploader(String name, LogSearchConfigLogFeeder config, 
InputConfigManager inputConfigManager, LogLevelFilterHandler 
logLevelFilterHandler) {
+    super(name);
+    this.config = config;
+    this.inputConfigManager = inputConfigManager;
+    this.logLevelFilterHandler = logLevelFilterHandler;
     setDaemon(true);
   }
 
@@ -70,7 +68,9 @@ public class InputConfigUploader extends Thread {
   public void init() throws Exception {
     this.configDir = new File(logFeederProps.getConfDir());
     this.start();
-    config.monitorInputConfigChanges(configHandler, logLevelFilterHandler, 
logFeederProps.getClusterName());
+    if (config != null) {
+      config.monitorInputConfigChanges(inputConfigManager, 
logLevelFilterHandler, logFeederProps.getClusterName());
+    }
   }
 
   @Override
@@ -85,7 +85,7 @@ public class InputConfigUploader extends Thread {
               m.find();
               String serviceName = m.group(1);
               String inputConfig = Files.toString(inputConfigFile, 
Charset.defaultCharset());
-              if (!config.inputConfigExists(serviceName)) {
+              if (config != null && !config.inputConfigExists(serviceName)) {
                 config.createInputConfig(logFeederProps.getClusterName(), 
serviceName, inputConfig);
               }
               filesHandled.add(inputConfigFile.getAbsolutePath());
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index b8eb5e9..64428f6 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.logfeeder.input;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.container.docker.DockerContainerRegistry;
@@ -106,7 +107,7 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker, InputFileB
         if (dockerContainerRegistry != null) {
           Map<String, Map<String, DockerMetadata>> metadataMap = 
dockerContainerRegistry.getContainerMetadataMap();
           String logType = getLogType();
-          if (metadataMap.containsKey(logType)) {
+          if (metadataMap.containsKey(StringUtils.removeStart(logType, 
LogFeederConstants.CLOUD_PREFIX))) {
             isReady = true;
           }
         } else {
@@ -140,12 +141,12 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker, InputFileB
   public String getNameForThread() {
     if (filePath != null) {
       try {
-        return (getType() + "=" + (new File(filePath)).getName());
+        return (getType() + "=" + (new File(filePath)).getName() + ";" + 
getCloudModeSuffix());
       } catch (Throwable ex) {
         logger.warn("Couldn't get basename for filePath=" + filePath, ex);
       }
     }
-    return super.getNameForThread() + ":" + getType();
+    return super.getNameForThread() + ":" + getType() + ";" + 
getCloudModeSuffix();
   }
 
   @Override
@@ -177,8 +178,9 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker, InputFileB
         Map<String, Map<String, DockerMetadata>> metadataMap = 
dockerContainerRegistry.getContainerMetadataMap();
         String logType = getLogType();
         threadGroup = new ThreadGroup("docker-parent-" + logType);
-        if (metadataMap.containsKey(logType)) {
-          Map<String, DockerMetadata> dockerMetadataMap = 
metadataMap.get(logType);
+        String replacedLogType = StringUtils.removeStart(logType, 
LogFeederConstants.CLOUD_PREFIX);
+        if (metadataMap.containsKey(replacedLogType)) {
+          Map<String, DockerMetadata> dockerMetadataMap = 
metadataMap.get(replacedLogType);
           for (Map.Entry<String, DockerMetadata> dockerMetadataEntry : 
dockerMetadataMap.entrySet()) {
             try {
               
startNewChildDockerInputFileThread(dockerMetadataEntry.getValue());
@@ -198,9 +200,9 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker, InputFileB
             for (Map.Entry<String, List<File>> folderFileEntry : 
getFolderMap().entrySet()) {
               startNewChildInputFileThread(folderFileEntry);
             }
-            logFilePathUpdaterThread = new Thread(new 
LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, 
detachTimeMin), "logfile_path_updater=" + filePath);
+            logFilePathUpdaterThread = new Thread(new 
LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, 
detachTimeMin), String.format("logfile_path_updater=%s;%s", filePath, 
getCloudModeSuffix()));
             logFilePathUpdaterThread.setDaemon(true);
-            logFileDetacherThread = new Thread(new 
LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), 
"logfile_detacher=" + filePath);
+            logFileDetacherThread = new Thread(new 
LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), 
String.format("logfile_detacher=%s;%s", filePath, getCloudModeSuffix()));
             logFileDetacherThread.setDaemon(true);
 
             logFilePathUpdaterThread.start();
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index 70e54d6..bd3e045 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -62,6 +62,16 @@ public class InputManagerImpl extends InputManager {
     return inputs.get(serviceName);
   }
 
+  private final String notReadyThreadName;
+
+  public InputManagerImpl() {
+    this.notReadyThreadName = "InputIsNotReadyMonitor";
+  }
+
+  public InputManagerImpl(String notReadyThreadName) {
+    this.notReadyThreadName = notReadyThreadName;
+  }
+
   @Override
   public void add(String serviceName, Input input) {
     List<Input> inputList = inputs.computeIfAbsent(serviceName, k -> new 
ArrayList<>());
@@ -130,7 +140,7 @@ public class InputManagerImpl extends InputManager {
   }
 
   private void startMonitorThread() {
-    Thread inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
+    Thread inputIsReadyMonitor = new Thread(notReadyThreadName) {
       @Override
       public void run() {
         logger.info("Going to monitor for these missing files: " + 
notReadyList.toString());
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
index 965aa84..1a15395 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
@@ -123,7 +123,7 @@ public class InputSocket extends Input<LogFeederProps, 
InputSocketMarker, InputS
 
   @Override
   public String getNameForThread() {
-    return String.format("socket=%s-%s-%s", getLogType(), this.protocol, 
this.port);
+    return String.format("socket=%s-%s-%s;%s", getLogType(), this.protocol, 
this.port, getCloudModeSuffix());
   }
 
   @Override
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java
new file mode 100644
index 0000000..c5a9172
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Helper for merge global and input configurations
+ */
+public class BlockMerger {
+  private BlockMerger() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void mergeBlocks(Map<String, Object> fromMap, Map<String, 
Object> toMap) {
+    for (String key : fromMap.keySet()) {
+      Object objValue = fromMap.get(key);
+      if (objValue == null) {
+        continue;
+      }
+      if (objValue instanceof Map) {
+        Map<String, Object> globalFields = 
LogFeederUtil.cloneObject((Map<String, Object>) objValue);
+
+        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
+        if (localFields == null) {
+          localFields = new HashMap<>();
+          toMap.put(key, localFields);
+        }
+
+        if (globalFields != null) {
+          for (String fieldKey : globalFields.keySet()) {
+            if (!localFields.containsKey(fieldKey)) {
+              localFields.put(fieldKey, globalFields.get(fieldKey));
+            }
+          }
+        }
+      }
+    }
+
+    // Let's add the rest of the top level fields if missing
+    for (String key : fromMap.keySet()) {
+      if (!toMap.containsKey(key)) {
+        toMap.put(key, fromMap.get(key));
+      }
+    }
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java
new file mode 100644
index 0000000..35ad1bd
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Holds common configuration/manager objects for input config 
managers/handlers in order to provide 1 object as input (instead of many)
+ */
+public class InputConfigHolder {
+
+  private final LogFeederProps logFeederProps;
+  private final LogSearchConfigLogFeeder config;
+  private final List<InputDescriptor> inputConfigList = new ArrayList<>();
+  private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
+  private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
+
+  private final InputManager inputManager;
+  private final OutputManager outputManager;
+
+  public InputConfigHolder(LogSearchConfigLogFeeder config, InputManager 
inputManager, OutputManager outputManager, LogFeederProps logFeederProps) {
+    this.logFeederProps = logFeederProps;
+    this.config = config;
+    this.inputManager = inputManager;
+    this.outputManager = outputManager;
+  }
+
+  public LogFeederProps getLogFeederProps() {
+    return logFeederProps;
+  }
+
+  public List<InputDescriptor> getInputConfigList() {
+    return inputConfigList;
+  }
+
+  public List<FilterDescriptor> getFilterConfigList() {
+    return filterConfigList;
+  }
+
+  public List<Map<String, Object>> getOutputConfigList() {
+    return outputConfigList;
+  }
+
+  public InputManager getInputManager() {
+    return inputManager;
+  }
+
+  public OutputManager getOutputManager() {
+    return outputManager;
+  }
+
+  public LogSearchConfigLogFeeder getConfig() {
+    return config;
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java
similarity index 53%
rename from 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
rename to 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java
index 61f726c..925bb65 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.common;
+package org.apache.ambari.logfeeder.manager;
 
 import com.google.common.collect.Maps;
 import com.google.gson.reflect.TypeToken;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
 import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
-import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.manager.InputManager;
 import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
@@ -40,7 +40,6 @@ import 
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfi
 import 
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputDescriptorImpl;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -48,7 +47,6 @@ import org.springframework.core.io.ClassPathResource;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import javax.inject.Inject;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -56,50 +54,115 @@ import java.lang.reflect.Type;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
- * Initialize / close input and output managers and monitors input 
configuration changes.
+ * Facade class for input config operations (add / load / remove configs and 
start or close input monitoring)
  */
-public class ConfigHandler implements InputConfigMonitor {
-  private static final Logger logger = 
LogManager.getLogger(ConfigHandler.class);
+public class InputConfigManager implements InputConfigMonitor {
 
-  private final LogSearchConfigLogFeeder logSearchConfig;
+  private Logger logger = LogManager.getLogger(InputConfigManager.class);
 
-  @Inject
-  private InputManager inputManager;
-  @Inject
-  private OutputManager outputManager;
-  @Inject
-  private LogFeederProps logFeederProps;
+  private final InputConfigHandler inputConfigHandler;
+  private final LogSearchConfigLogFeeder logSearchConfig;
+  private final LogFeederProps logFeederProps;
+  private final InputConfigHolder inputConfigHolder;
+  private final boolean loadOutput;
 
   private final Map<String, Object> globalConfigs = new HashMap<>();
   private final List<String> globalConfigJsons = new ArrayList<>();
 
-  private final List<InputDescriptor> inputConfigList = new ArrayList<>();
-  private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
-  private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
-
   private boolean simulateMode = false;
 
-  public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) {
+  public InputConfigManager(LogSearchConfigLogFeeder logSearchConfig, 
InputManager inputManager,
+                            OutputManager outputManager, InputConfigHandler 
inputConfigHandler,
+                            LogFeederProps logFeederProps, boolean loadOutput) 
{
     this.logSearchConfig = logSearchConfig;
+    this.inputConfigHandler = inputConfigHandler;
+    this.logFeederProps = logFeederProps;
+    this.loadOutput = loadOutput;
+    this.inputConfigHolder = new InputConfigHolder(logSearchConfig, 
inputManager, outputManager, logFeederProps);
   }
 
   @PostConstruct
   public void init() throws Exception {
     loadConfigFiles();
     logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), 
logFeederProps.getClusterName());
-    loadOutputs();
+    inputConfigHandler.init(inputConfigHolder);
     simulateIfNeeded();
+    if (loadOutput) {
+      loadOutputs();
+    }
+    inputConfigHolder.getInputManager().init();
+    inputConfigHolder.getOutputManager().init();
+  }
+
+  @Override
+  public List<String> getGlobalConfigJsons() {
+    return this.globalConfigJsons;
+  }
+
+  @Override
+  public void loadInputConfigs(String serviceName, InputConfig inputConfig) 
throws Exception {
+    inputConfigHolder.getInputConfigList().clear();
+    inputConfigHolder.getFilterConfigList().clear();
+    inputConfigHolder.getInputConfigList().addAll(inputConfig.getInput());
+    inputConfigHolder.getFilterConfigList().addAll(inputConfig.getFilter());
+    if (simulateMode) {
+      InputSimulate.loadTypeToFilePath(inputConfigHolder.getInputConfigList());
+    } else {
+      inputConfigHandler.loadInputs(serviceName, inputConfigHolder, 
inputConfig);
+      inputConfigHandler.assignInputsToOutputs(serviceName, inputConfigHolder, 
inputConfig);
+    }
+    inputConfigHolder.getInputManager().startInputs(serviceName);
+  }
 
-    inputManager.init();
-    outputManager.init();
+  @Override
+  public void removeInputs(String serviceName) {
+    inputConfigHolder.getInputManager().removeInputsForService(serviceName);
+  }
+
+  public void cleanCheckPointFiles() {
+    
inputConfigHolder.getInputManager().getCheckpointHandler().cleanupCheckpoints();
+  }
+
+  public void logStats() {
+    inputConfigHolder.getInputManager().logStats();
+    inputConfigHolder.getOutputManager().logStats();
+  }
+
+  public void addMetrics(List<MetricData> metricsList) {
+    inputConfigHolder.getInputManager().addMetricsContainers(metricsList);
+    inputConfigHolder.getOutputManager().addMetricsContainers(metricsList);
+  }
+
+  @PreDestroy
+  public void close() {
+    inputConfigHolder.getInputManager().close();
+    inputConfigHolder.getOutputManager().close();
+    inputConfigHolder.getInputManager().checkInAll();
+  }
+
+  public Input getTestInput(InputConfig inputConfig, String logId) {
+    for (InputDescriptor inputDescriptor : inputConfig.getInput()) {
+      if (inputDescriptor.getType().equals(logId)) {
+        inputConfigHolder.getInputConfigList().add(inputDescriptor);
+        break;
+      }
+    }
+    if (inputConfigHolder.getInputConfigList().isEmpty()) {
+      throw new IllegalArgumentException("Log Id " + logId + " was not found 
in shipper configuriaton");
+    }
+
+    for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
+      inputConfigHolder.getFilterConfigList().add(filterDescriptor);
+    }
+    inputConfigHandler.loadInputs("test", inputConfigHolder, inputConfig);
+    List<Input> inputList = 
inputConfigHolder.getInputManager().getInputList("test");
+
+    return inputList != null && inputList.size() == 1 ? inputList.get(0) : 
null;
   }
 
   private void loadConfigFiles() throws Exception {
@@ -121,13 +184,11 @@ public class ConfigHandler implements InputConfigMonitor {
 
   private List<String> getConfigFiles() {
     List<String> configFiles = new ArrayList<>();
-
     String logFeederConfigFilesProperty = logFeederProps.getConfigFiles();
     logger.info("logfeeder.config.files=" + logFeederConfigFilesProperty);
     if (logFeederConfigFilesProperty != null) {
       
configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(",")));
     }
-
     return configFiles;
   }
 
@@ -152,57 +213,8 @@ public class ConfigHandler implements InputConfigMonitor {
     }
   }
 
-  @Override
-  public void loadInputConfigs(String serviceName, InputConfig inputConfig) 
throws Exception {
-    inputConfigList.clear();
-    filterConfigList.clear();
-
-    inputConfigList.addAll(inputConfig.getInput());
-    filterConfigList.addAll(inputConfig.getFilter());
-
-    if (simulateMode) {
-      InputSimulate.loadTypeToFilePath(inputConfigList);
-    } else {
-      loadInputs(serviceName);
-      loadFilters(serviceName);
-      assignOutputsToInputs(serviceName);
-
-      inputManager.startInputs(serviceName);
-    }
-  }
-
-  @Override
-  public void removeInputs(String serviceName) {
-    inputManager.removeInputsForService(serviceName);
-  }
-
-  public Input getTestInput(InputConfig inputConfig, String logId) {
-    for (InputDescriptor inputDescriptor : inputConfig.getInput()) {
-      if (inputDescriptor.getType().equals(logId)) {
-        inputConfigList.add(inputDescriptor);
-        break;
-      }
-    }
-    if (inputConfigList.isEmpty()) {
-      throw new IllegalArgumentException("Log Id " + logId + " was not found 
in shipper configuriaton");
-    }
-
-    for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
-//      if ("grok".equals(filterDescriptor.getFilter())) {
-//        // Thus ensure that the log entry passed will be parsed immediately
-//        ((FilterGrokDescriptor)filterDescriptor).setMultilinePattern(null);
-//      }
-      filterConfigList.add(filterDescriptor);
-    }
-    loadInputs("test");
-    loadFilters("test");
-    List<Input> inputList = inputManager.getInputList("test");
-
-    return inputList != null && inputList.size() == 1 ? inputList.get(0) : 
null;
-  }
-
   @SuppressWarnings("unchecked")
-  public void loadConfigs(String configData) throws Exception {
+  private void loadConfigs(String configData) throws Exception {
     Type type = new TypeToken<Map<String, Object>>() {}.getType();
     Map<String, Object> configMap = 
LogFeederUtil.getGson().fromJson(configData, type);
 
@@ -215,7 +227,7 @@ public class ConfigHandler implements InputConfigMonitor {
           break;
         case "output" :
           List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) 
configMap.get(key);
-          outputConfigList.addAll(outputConfig);
+          inputConfigHolder.getOutputConfigList().addAll(outputConfig);
           break;
         default :
           logger.warn("Unknown config key: " + key);
@@ -223,39 +235,13 @@ public class ConfigHandler implements InputConfigMonitor {
     }
   }
 
-  @Override
-  public List<String> getGlobalConfigJsons() {
-    return globalConfigJsons;
-  }
-
-  private void simulateIfNeeded() throws Exception {
-    int simulatedInputNumber = 
logFeederProps.getInputSimulateConfig().getSimulateInputNumber();
-    if (simulatedInputNumber == 0)
-      return;
-
-    InputConfigImpl simulateInputConfig = new InputConfigImpl();
-    List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
-    simulateInputConfig.setInput(inputConfigDescriptors);
-    simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
-    for (int i = 0; i < simulatedInputNumber; i++) {
-      InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
-      inputDescriptor.setSource("simulate");
-      inputDescriptor.setRowtype("service");
-      inputDescriptor.setAddFields(new HashMap<String, String>());
-      inputConfigDescriptors.add(inputDescriptor);
-    }
-
-    loadInputConfigs("Simulation", simulateInputConfig);
-
-    simulateMode = true;
-  }
-
   private void loadOutputs() {
-    for (Map<String, Object> map : outputConfigList) {
+    for (Map<String, Object> map : inputConfigHolder.getOutputConfigList()) {
       if (map == null) {
+        logger.warn("Output map is empty. Skipping...");
         continue;
       }
-      mergeBlocks(globalConfigs, map);
+      BlockMerger.mergeBlocks(globalConfigs, map);
 
       String value = (String) map.get("destination");
       if (StringUtils.isEmpty(value)) {
@@ -274,182 +260,32 @@ public class ConfigHandler implements InputConfigMonitor 
{
       // We will only check for is_enabled out here. Down below we will check 
whether this output is enabled for the input
       if (output.isEnabled()) {
         output.logConfigs();
-        outputManager.add(output);
+        inputConfigHolder.getOutputManager().add(output);
       } else {
         logger.info("Output is disabled. So ignoring it. " + 
output.getShortDescription());
       }
     }
   }
 
-  private void loadInputs(String serviceName) {
-    for (InputDescriptor inputDescriptor : inputConfigList) {
-      if (inputDescriptor == null) {
-        continue;
-      }
-
-      String source = (String) inputDescriptor.getSource();
-      if (StringUtils.isEmpty(source)) {
-        logger.error("Input block doesn't have source element");
-        continue;
-      }
-      Input input = (Input) AliasUtil.getClassInstance(source, 
AliasUtil.AliasType.INPUT);
-      if (input == null) {
-        logger.error("Input object could not be found");
-        continue;
-      }
-      input.setType(source);
-      input.setLogType(inputDescriptor.getType());
-      input.loadConfig(inputDescriptor);
-
-      if (input.isEnabled()) {
-        input.setOutputManager(outputManager);
-        input.setInputManager(inputManager);
-        inputManager.add(serviceName, input);
-        input.logConfigs();
-      } else {
-        logger.info("Input is disabled. So ignoring it. " + 
input.getShortDescription());
-      }
-    }
-  }
-
-  private void loadFilters(String serviceName) {
-    sortFilters();
-
-    List<Input> toRemoveInputList = new ArrayList<Input>();
-    for (Input input : inputManager.getInputList(serviceName)) {
-      for (FilterDescriptor filterDescriptor : filterConfigList) {
-        if (filterDescriptor == null) {
-          continue;
-        }
-        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
because it is disabled");
-          continue;
-        }
-        if (!input.isFilterRequired(filterDescriptor)) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
for input " + input.getShortDescription());
-          continue;
-        }
-
-        String value = filterDescriptor.getFilter();
-        if (StringUtils.isEmpty(value)) {
-          logger.error("Filter block doesn't have filter element");
-          continue;
-        }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, 
AliasUtil.AliasType.FILTER);
-        if (filter == null) {
-          logger.error("Filter object could not be found");
-          continue;
-        }
-        filter.loadConfig(filterDescriptor);
-        filter.setInput(input);
-
-        filter.setOutputManager(outputManager);
-        input.addFilter(filter);
-        filter.logConfigs();
-      }
-
-      if (input.getFirstFilter() == null) {
-        toRemoveInputList.add(input);
-      }
-    }
-
-    for (Input toRemoveInput : toRemoveInputList) {
-      logger.warn("There are no filters, we will ignore this input. " + 
toRemoveInput.getShortDescription());
-      inputManager.removeInput(toRemoveInput);
-    }
-  }
-
-  private void sortFilters() {
-    Collections.sort(filterConfigList, (o1, o2) -> {
-      Integer o1Sort = o1.getSortOrder();
-      Integer o2Sort = o2.getSortOrder();
-      if (o1Sort == null || o2Sort == null) {
-        return 0;
-      }
-
-      return o1Sort - o2Sort;
-    });
-  }
-
-  private void assignOutputsToInputs(String serviceName) {
-    Set<Output> usedOutputSet = new HashSet<Output>();
-    for (Input input : inputManager.getInputList(serviceName)) {
-      for (Output output : outputManager.getOutputs()) {
-        if (input.isOutputRequired(output)) {
-          usedOutputSet.add(output);
-          input.addOutput(output);
-        }
-      }
-    }
-
-    // In case of simulation copies of the output are added for each 
simulation instance, these must be added to the manager
-    for (Output output : InputSimulate.getSimulateOutputs()) {
-      output.setLogSearchConfig(logSearchConfig);
-      outputManager.add(output);
-      usedOutputSet.add(output);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> 
toMap) {
-    for (String key : fromMap.keySet()) {
-      Object objValue = fromMap.get(key);
-      if (objValue == null) {
-        continue;
-      }
-      if (objValue instanceof Map) {
-        Map<String, Object> globalFields = 
LogFeederUtil.cloneObject((Map<String, Object>) objValue);
-
-        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
-        if (localFields == null) {
-          localFields = new HashMap<String, Object>();
-          toMap.put(key, localFields);
-        }
-
-        if (globalFields != null) {
-          for (String fieldKey : globalFields.keySet()) {
-            if (!localFields.containsKey(fieldKey)) {
-              localFields.put(fieldKey, globalFields.get(fieldKey));
-            }
-          }
-        }
-      }
-    }
+  private void simulateIfNeeded() throws Exception {
+    int simulatedInputNumber = 
inputConfigHolder.getLogFeederProps().getInputSimulateConfig().getSimulateInputNumber();
+    if (simulatedInputNumber == 0)
+      return;
 
-    // Let's add the rest of the top level fields if missing
-    for (String key : fromMap.keySet()) {
-      if (!toMap.containsKey(key)) {
-        toMap.put(key, fromMap.get(key));
-      }
+    InputConfigImpl simulateInputConfig = new InputConfigImpl();
+    List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
+    simulateInputConfig.setInput(inputConfigDescriptors);
+    simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
+    for (int i = 0; i < simulatedInputNumber; i++) {
+      InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+      inputDescriptor.setSource("simulate");
+      inputDescriptor.setRowtype("service");
+      inputDescriptor.setAddFields(new HashMap<String, String>());
+      inputConfigDescriptors.add(inputDescriptor);
     }
-  }
-
-  public void cleanCheckPointFiles() {
-    inputManager.getCheckpointHandler().cleanupCheckpoints();
-  }
-
-  public void logStats() {
-    inputManager.logStats();
-    outputManager.logStats();
-  }
 
-  public void addMetrics(List<MetricData> metricsList) {
-    inputManager.addMetricsContainers(metricsList);
-    outputManager.addMetricsContainers(metricsList);
-  }
-
-  @PreDestroy
-  public void close() {
-    inputManager.close();
-    outputManager.close();
-    inputManager.checkInAll();
-  }
-
-  public void setInputManager(InputManager inputManager) {
-    this.inputManager = inputManager;
-  }
+    loadInputConfigs("Simulation", simulateInputConfig);
 
-  public void setOutputManager(OutputManager outputManager) {
-    this.outputManager = outputManager;
+    simulateMode = true;
   }
 }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java
new file mode 100644
index 0000000..2e80d0d
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations;
+
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+
+/**
+ * Holds operations regarding input config handling. (init configs, load input 
configs and assign inputs to outputs)
+ */
+public interface InputConfigHandler {
+
+  /**
+   * Initialization step before loading inputs/filter/outputs
+   * @param inputConfigHolder object that holds input/filter/output 
configuration details
+   * @throws Exception error during initialization
+   */
+  void init(InputConfigHolder inputConfigHolder) throws Exception;
+
+  /**
+   * Step during input/filter configurations initialization
+   * @param serviceName group of input configs
+   * @param inputConfigHolder object that holds input/filter/output 
configuration details
+   * @param config input/filter config object
+   */
+  void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, 
InputConfig config);
+
+  /**
+   * Assign inputs to outputs - after inputs/filters/outputs are loaded
+   * @param serviceName group of input configs
+   * @param inputConfigHolder object that holds input/filter/output 
configuration details
+   * @param config input/filter config object
+   * @throws Exception error during input/output assignment
+   */
+  void assignInputsToOutputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig config) throws Exception;
+
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
new file mode 100644
index 0000000..deb3a91
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations.impl;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederMode;
+import org.apache.ambari.logfeeder.filter.FilterDummy;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Holds input/filter/output operations in cloud Log Feeder mode.
+ */
+public class CloudStorageInputConfigHandler implements InputConfigHandler {
+
+  private static final Logger logger = 
LogManager.getLogger(CloudStorageInputConfigHandler.class);
+
+  @Override
+  public void init(InputConfigHolder inputConfigHolder) {
+    logger.info("Call init of cloud input config handler...");
+  }
+
+  @Override
+  public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig inputConfig) {
+    for (InputDescriptor inputDescriptor : 
inputConfigHolder.getInputConfigList()) {
+      if (inputDescriptor == null) {
+        logger.warn("Input descriptor is smpty. Skipping...");
+        continue;
+      }
+      LogFeederMode mode = 
inputConfigHolder.getLogFeederProps().getCloudStorageMode();
+      if (inputDescriptor instanceof InputSocketDescriptor && 
LogFeederMode.HYBRID.equals(mode)) {
+        logger.info("Socket input is skipped (won't be sent to cloud storage) 
in hybrid mode");
+        continue;
+      }
+      String source = inputDescriptor.getSource();
+      if (StringUtils.isEmpty(source)) {
+        logger.error("Input block doesn't have source element");
+        continue;
+      }
+      Input input = (Input) AliasUtil.getClassInstance(source, 
AliasUtil.AliasType.INPUT);
+      if (input == null) {
+        logger.error("Input object could not be found");
+        continue;
+      }
+      input.setType(source);
+      input.setLogType(LogFeederConstants.CLOUD_PREFIX + 
inputDescriptor.getType());
+      input.loadConfig(inputDescriptor);
+      FilterDummy filter = new FilterDummy();
+      filter.setOutputManager(inputConfigHolder.getOutputManager());
+      input.setFirstFilter(filter);
+      input.setCloudInput(true);
+
+      if (input.isEnabled()) {
+        input.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setInputManager(inputConfigHolder.getInputManager());
+        inputConfigHolder.getInputManager().add(serviceName, input);
+        logger.info("New cloud input object registered for service '{}': 
'{}'", serviceName, input.getLogType());
+        input.logConfigs();
+      } else {
+        logger.info("Input is disabled. So ignoring it. " + 
input.getShortDescription());
+      }
+    }
+  }
+
+  @Override
+  public void assignInputsToOutputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig config) {
+    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      List<Output> outputs = inputConfigHolder.getOutputManager().getOutputs();
+      for (Output output : outputs) {
+        input.addOutput(output);
+      }
+    }
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
new file mode 100644
index 0000000..44da631
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations.impl;
+
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import org.apache.ambari.logfeeder.input.InputSimulate;
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Holds input/filter/output operations in default Log Feeder mode.
+ */
+public class DefaultInputConfigHandler implements InputConfigHandler {
+
+  private static final Logger logger = 
LogManager.getLogger(DefaultInputConfigHandler.class);
+
+  @Override
+  public void init(InputConfigHolder inputConfigHolder) throws Exception {
+  }
+
+  @Override
+  public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig inputConfig) {
+    loadInputs(serviceName, inputConfigHolder);
+    loadFilters(serviceName, inputConfigHolder);
+  }
+
+  @Override
+  public void assignInputsToOutputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig config) {
+    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      for (Output output : inputConfigHolder.getOutputManager().getOutputs()) {
+        if (input.isOutputRequired(output)) {
+          input.addOutput(output);
+        }
+      }
+    }
+
+    // In case of simulation copies of the output are added for each 
simulation instance, these must be added to the manager
+    for (Output output : InputSimulate.getSimulateOutputs()) {
+      output.setLogSearchConfig(inputConfigHolder.getConfig());
+      inputConfigHolder.getOutputManager().add(output);
+    }
+  }
+
+  private void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder) {
+    for (InputDescriptor inputDescriptor : 
inputConfigHolder.getInputConfigList()) {
+      if (inputDescriptor == null) {
+        logger.warn("Input descriptor is smpty. Skipping...");
+        continue;
+      }
+
+      String source = inputDescriptor.getSource();
+      if (StringUtils.isEmpty(source)) {
+        logger.error("Input block doesn't have source element");
+        continue;
+      }
+      Input input = (Input) AliasUtil.getClassInstance(source, 
AliasUtil.AliasType.INPUT);
+      if (input == null) {
+        logger.error("Input object could not be found");
+        continue;
+      }
+      input.setType(source);
+      input.setLogType(inputDescriptor.getType());
+      input.loadConfig(inputDescriptor);
+
+      if (input.isEnabled()) {
+        input.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setInputManager(inputConfigHolder.getInputManager());
+        inputConfigHolder.getInputManager().add(serviceName, input);
+        logger.info("New input object registered for service '{}': '{}'", 
serviceName, input.getLogType());
+        input.logConfigs();
+      } else {
+        logger.info("Input is disabled. So ignoring it. " + 
input.getShortDescription());
+      }
+    }
+  }
+
+  private void loadFilters(String serviceName, InputConfigHolder 
inputConfigHolder) {
+    sortFilters(inputConfigHolder);
+
+    List<Input> toRemoveInputList = new ArrayList<>();
+    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      for (FilterDescriptor filterDescriptor : 
inputConfigHolder.getFilterConfigList()) {
+        if (filterDescriptor == null) {
+          logger.warn("Filter descriptor is smpty. Skipping...");
+          continue;
+        }
+        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
+          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
because it is disabled");
+          continue;
+        }
+        if (!input.isFilterRequired(filterDescriptor)) {
+          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
for input " + input.getShortDescription());
+          continue;
+        }
+
+        String value = filterDescriptor.getFilter();
+        if (StringUtils.isEmpty(value)) {
+          logger.error("Filter block doesn't have filter element");
+          continue;
+        }
+        Filter filter = (Filter) AliasUtil.getClassInstance(value, 
AliasUtil.AliasType.FILTER);
+        if (filter == null) {
+          logger.error("Filter object could not be found");
+          continue;
+        }
+        filter.loadConfig(filterDescriptor);
+        filter.setInput(input);
+
+        filter.setOutputManager(inputConfigHolder.getOutputManager());
+        input.addFilter(filter);
+        filter.logConfigs();
+      }
+
+      if (input.getFirstFilter() == null) {
+        toRemoveInputList.add(input);
+      }
+    }
+
+    for (Input toRemoveInput : toRemoveInputList) {
+      logger.warn("There are no filters, we will ignore this input. " + 
toRemoveInput.getShortDescription());
+      inputConfigHolder.getInputManager().removeInput(toRemoveInput);
+    }
+  }
+
+  private void sortFilters(InputConfigHolder inputConfigHolder) {
+    Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
+      Integer o1Sort = o1.getSortOrder();
+      Integer o2Sort = o2.getSortOrder();
+      if (o1Sort == null || o2Sort == null) {
+        return 0;
+      }
+
+      return o1Sort - o2Sort;
+    });
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
index e72fd43..bc1510a 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
@@ -18,7 +18,7 @@
  */
 package org.apache.ambari.logfeeder.metrics;
 
-import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -36,15 +36,15 @@ public class StatsLogger extends Thread {
 
   private long lastCheckPointCleanedMS = 0;
 
-  @Inject
-  private ConfigHandler configHandler;
+  private final InputConfigManager inputConfigManager;
 
   @Inject
   private MetricsManager metricsManager;
 
-  public StatsLogger() {
-    super("statLogger");
+  public StatsLogger(String name, InputConfigManager inputConfigManager) {
+    super(name);
     setDaemon(true);
+    this.inputConfigManager = inputConfigManager;
   }
 
   @PostConstruct
@@ -68,16 +68,16 @@ public class StatsLogger extends Thread {
 
       if (System.currentTimeMillis() > (lastCheckPointCleanedMS + 
CHECKPOINT_CLEAN_INTERVAL_MS)) {
         lastCheckPointCleanedMS = System.currentTimeMillis();
-        configHandler.cleanCheckPointFiles();
+        inputConfigManager.cleanCheckPointFiles();
       }
     }
   }
 
   private void logStats() {
-    configHandler.logStats();
+    inputConfigManager.logStats();
     if (metricsManager.isMetricsEnabled()) {
       List<MetricData> metricsList = new ArrayList<MetricData>();
-      configHandler.addMetrics(metricsList);
+      inputConfigManager.addMetrics(metricsList);
       metricsManager.useMetrics(metricsList);
     }
   }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index 68db96a..afe1c0a 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -72,6 +72,7 @@ public class OutputManagerImpl extends OutputManager {
   @SuppressWarnings("unchecked")
   @Override
   public void init() throws Exception {
+    logger.info("Called init with default output manager.");
     for (Output output : outputs) {
       output.init(logFeederProps);
     }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
new file mode 100644
index 0000000..871ae93
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+
+/**
+ * Class for creating the right cloud storage outputs based on global Log 
Feeder configurations
+ * TODO !!!
+ */
+public class CloudStorageFactory {
+
+  public static CloudStorageOutput createCloudStorageOutput(LogFeederProps 
properties) {
+    return new HDFSOutput();
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
new file mode 100644
index 0000000..561b141
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+
+/**
+ * Class to handle common operations for cloud storage outputs
+ * TODO !!!
+ */
+public abstract class CloudStorageOutput extends Output<LogFeederProps, 
InputMarker> {
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
new file mode 100644
index 0000000..4994eb7
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handle output operations for sending cloud inputs to a cloud storage 
destination
+ * TODO !!!
+ */
+public class CloudStorageOutputManager extends OutputManager {
+
+  private static final Logger logger = 
LogManager.getLogger(CloudStorageOutputManager.class);
+
+  @Inject
+  private LogFeederProps logFeederProps;
+
+  private CloudStorageOutput storageOutput = null;
+
+  private List<Output> outputList = new ArrayList<>();
+
+  @Override
+  public void write(Map<String, Object> jsonObj, InputMarker marker) {
+    // TODO: make sense to implement this if we will support filters before 
calling cloud outputs
+  }
+
+  @Override
+  public void write(String line, InputMarker marker) {
+    logger.info("Output: {}", line);
+    try {
+      storageOutput.write(line, marker);
+    } catch (Exception e) {
+
+    }
+  }
+
+  @Override
+  public void copyFile(File file, InputMarker marker) {
+
+  }
+
+  @Override
+  public void add(Output output) {
+    this.outputList.add(output);
+  }
+
+  @Override
+  public List<Output> getOutputs() {
+    return this.outputList;
+  }
+
+  @Override
+  public void init() throws Exception {
+    logger.info("Called init with cloud storage output manager.");
+    storageOutput = 
CloudStorageFactory.createCloudStorageOutput(logFeederProps);
+    storageOutput.init(logFeederProps);
+    add(storageOutput);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void logStats() {
+
+  }
+
+  @Override
+  public void addMetricsContainers(List<MetricData> metricsList) {
+
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
new file mode 100644
index 0000000..24edb41
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+
+/**
+ * HDFS cloud storage output (on-prem)
+ * TODO !!!
+ */
+public class HDFSOutput extends CloudStorageOutput {
+
+  private Logger logger = LogManager.getLogger(HDFSOutput.class);
+
+  @Override
+  public String getOutputType() {
+    return null;
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) throws 
Exception {
+  }
+
+  @Override
+  public void write(String line, InputMarker inputMarker) throws Exception {
+    inputMarker.getInput().checkIn(inputMarker);
+  }
+
+  @Override
+  public Long getPendingCount() {
+    return null;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return null;
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProperties) throws Exception {
+    logger.info("Initialize on-prem HDFS output");
+  }
+
+  @Override
+  public String getShortDescription() {
+    return null;
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return null;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties 
b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 0fb1058..06c95f3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -38,3 +38,6 @@ logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp
 
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
+#logfeeder.docker.registry.enabled=true
+logfeeder.cloud.storage.mode=default
+#logfeeder.cloud.storage.mode=cloud
diff --git 
a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
 
b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
index 63799e3..6674be1 100644
--- 
a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ 
b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.output;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -64,7 +63,6 @@ public class OutputS3FileTest {
     }
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
 
@@ -83,7 +81,6 @@ public class OutputS3FileTest {
     assertTrue(outputS3File.shouldRollover(logSpoolerContext));
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldNotRolloverBeforeSufficientSizeIsReached() throws 
Exception {
     String thresholdSize = Long.toString(15 * 1024 * 1024L);
diff --git 
a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
 
b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
index facc77f..e070545 100644
--- 
a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ 
b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -37,7 +37,6 @@ public class S3UploaderTest {
   public static final String ACCESS_KEY_VALUE = "accessKeyValue";
   public static final String SECRET_KEY_VALUE = "secretKeyValue";
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldUploadToS3ToRightBucket() {
     File fileToUpload = mock(File.class);
@@ -66,7 +65,6 @@ public class S3UploaderTest {
     
assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", 
resolvedPath);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldCleanupLocalFilesOnSuccessfulUpload() {
     File fileToUpload = mock(File.class);
@@ -96,7 +94,6 @@ public class S3UploaderTest {
     verify(compressedFile);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldNotCleanupUncompressedFileIfNotRequired() {
     File fileToUpload = mock(File.class);
@@ -124,7 +121,6 @@ public class S3UploaderTest {
     verify(compressedFile);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldExpandVariablesInPath() {
     File fileToUpload = mock(File.class);
diff --git 
a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
 
b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
index 4a7b9b0..2cfe9ff 100644
--- 
a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ 
b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -22,7 +22,6 @@ import org.easymock.EasyMockRule;
 import org.easymock.LogicalOperator;
 import org.easymock.Mock;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -56,7 +55,6 @@ public class LogSpoolerTest {
     spoolDirectory = testFolder.getRoot().getAbsolutePath();
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldSpoolEventToFile() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
@@ -93,7 +91,6 @@ public class LogSpoolerTest {
     return mockFile;
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldIncrementSpooledEventsCount() {
 
@@ -126,7 +123,6 @@ public class LogSpoolerTest {
     verify(rolloverCondition);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldCloseCurrentSpoolFileOnRollOver() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
@@ -161,7 +157,6 @@ public class LogSpoolerTest {
     verify(spoolWriter);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldReinitializeFileOnRollover() {
     final PrintWriter spoolWriter1 = mock(PrintWriter.class);
@@ -217,7 +212,6 @@ public class LogSpoolerTest {
     verify(spoolWriter1, spoolWriter2, rolloverCondition);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldCallRolloverHandlerOnRollover() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
@@ -255,7 +249,6 @@ public class LogSpoolerTest {
   // Rollover twice - the second rollover should work if the 
"rolloverInProgress"
   // flag is being reset correctly. Third file expectations being setup due
   // to auto-initialization.
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldResetRolloverInProgressFlag() {
     final PrintWriter spoolWriter1 = mock(PrintWriter.class);
@@ -329,7 +322,6 @@ public class LogSpoolerTest {
     verify(spoolWriter1, spoolWriter2, rolloverCondition);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldNotRolloverZeroLengthFiles() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
diff --git 
a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
 
b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
index 92c2b32..54ebf45 100644
--- 
a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
+++ 
b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logsearch;
 
 import org.springframework.boot.Banner;
+import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import 
org.springframework.boot.autoconfigure.data.rest.RepositoryRestMvcAutoConfiguration;
 import 
org.springframework.boot.autoconfigure.data.solr.SolrRepositoriesAutoConfiguration;
@@ -47,7 +48,7 @@ public class LogSearch {
     new SpringApplicationBuilder(LogSearch.class)
       .bannerMode(Banner.Mode.OFF)
       .listeners(new ApplicationPidFileWriter(pidFile))
-      .web(true)
+      .web(WebApplicationType.SERVLET)
       .run(args);
   }
 
diff --git a/docker/test-config/logfeeder/logfeeder.properties 
b/docker/test-config/logfeeder/logfeeder.properties
index 8371170..ffdb061 100644
--- a/docker/test-config/logfeeder/logfeeder.properties
+++ b/docker/test-config/logfeeder/logfeeder.properties
@@ -34,4 +34,5 @@ logfeeder.solr.core.config.name=history
 #logfeeder.solr.urls=http://solr:8983/solr
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
-#logfeeder.configs.filter.zk.enabled=true
\ No newline at end of file
+#logfeeder.configs.filter.zk.enabled=true
+#logfeeder.cloud.storage.mode=hybrid
\ No newline at end of file

Reply via email to