AMBARI-22639. Log Feeder refactor: integrate with spring boot (oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/31e8e55a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/31e8e55a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/31e8e55a Branch: refs/heads/branch-feature-AMBARI-21674 Commit: 31e8e55a900e0dd4915810302da22991e7cc54e4 Parents: a3d4c3d Author: Oliver Szabo <oleew...@gmail.com> Authored: Tue Dec 12 22:34:09 2017 +0100 Committer: Oliver Szabo <oleew...@gmail.com> Committed: Wed Dec 13 22:44:51 2017 +0100 ---------------------------------------------------------------------- .../config/api/LogSearchConfigFactory.java | 49 +- .../config/api/LogSearchConfigLogFeeder.java | 3 +- .../ambari-logsearch-logfeeder/.gitignore | 1 + .../ambari-logsearch-logfeeder/pom.xml | 62 ++- .../org/apache/ambari/logfeeder/LogFeeder.java | 167 +------ .../ambari/logfeeder/LogFeederCommandLine.java | 28 +- .../ambari/logfeeder/common/ConfigHandler.java | 47 +- .../ambari/logfeeder/common/ConfigItem.java | 8 +- .../logfeeder/common/LogEntryParseTester.java | 3 +- .../logfeeder/common/LogFeederConstants.java | 50 ++ .../logfeeder/conf/ApplicationConfig.java | 107 ++++ .../logfeeder/conf/InputSimulateConfig.java | 154 ++++++ .../logfeeder/conf/LogEntryCacheConfig.java | 118 +++++ .../ambari/logfeeder/conf/LogFeederProps.java | 226 +++++++++ .../logfeeder/conf/LogFeederSecurityConfig.java | 189 +++++++ .../logfeeder/conf/MetricsCollectorConfig.java | 113 +++++ .../apache/ambari/logfeeder/filter/Filter.java | 7 +- .../ambari/logfeeder/filter/FilterGrok.java | 7 +- .../ambari/logfeeder/filter/FilterKeyValue.java | 5 +- .../logfeeder/input/AbstractInputFile.java | 11 +- .../apache/ambari/logfeeder/input/Input.java | 25 +- .../logfeeder/input/InputConfigUploader.java | 82 +-- .../ambari/logfeeder/input/InputManager.java | 28 +- .../ambari/logfeeder/input/InputSimulate.java | 44 +- .../logfeeder/loglevelfilter/FilterLogData.java | 73 --- .../loglevelfilter/LogLevelFilterHandler.java | 89 +++- .../logfeeder/metrics/LogFeederAMSClient.java | 22 +- .../logfeeder/metrics/MetricsManager.java | 16 +- .../ambari/logfeeder/metrics/StatsLogger.java | 83 ++++ .../ambari/logfeeder/output/OutputFile.java | 5 +- .../ambari/logfeeder/output/OutputHDFSFile.java | 8 +- .../ambari/logfeeder/output/OutputKafka.java | 5 +- .../ambari/logfeeder/output/OutputManager.java | 36 +- .../ambari/logfeeder/output/OutputS3File.java | 8 +- .../ambari/logfeeder/output/OutputSolr.java | 10 +- .../logfeeder/util/LogFeederPropertiesUtil.java | 498 ------------------- .../apache/ambari/logfeeder/util/SSLUtil.java | 134 ----- .../shipper-conf/input.config-sample.json | 2 +- .../src/main/resources/log4j.xml | 15 +- .../src/main/resources/logfeeder.properties | 18 +- .../ambari/logfeeder/filter/FilterGrokTest.java | 3 +- .../ambari/logfeeder/filter/FilterJSONTest.java | 3 +- .../logfeeder/filter/FilterKeyValueTest.java | 3 +- .../ambari/logfeeder/input/InputFileTest.java | 11 +- .../logfeeder/input/InputManagerTest.java | 12 +- .../logconfig/LogConfigHandlerTest.java | 148 ------ .../logfeeder/metrics/MetricsManagerTest.java | 7 - .../logfeeder/output/OutputKafkaTest.java | 7 +- .../logfeeder/output/OutputManagerTest.java | 32 +- .../logfeeder/output/OutputS3FileTest.java | 9 +- .../ambari/logfeeder/output/OutputSolrTest.java | 10 +- 51 files changed, 1542 insertions(+), 1259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java index a84a97b..c74fad3 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java @@ -37,12 +37,13 @@ public class LogSearchConfigFactory { * @param properties The properties of the component for which the configuration is created. If the properties contain the * "logsearch.config.class" entry than the class defined there would be used instead of the default class. * @param defaultClass The default configuration class to use if not specified otherwise. + * @param init initialize the properties and zookeeper client * @return The Log Search Configuration instance. * @throws Exception Throws exception if the defined class does not implement LogSearchConfigServer, or doesn't have an empty * constructor, or throws an exception in it's init method. */ public static LogSearchConfigServer createLogSearchConfigServer(Map<String, String> properties, - Class<? extends LogSearchConfigServer> defaultClass) throws Exception { + Class<? extends LogSearchConfigServer> defaultClass, boolean init) throws Exception { try { LogSearchConfigServer logSearchConfig = null; String configClassName = properties.get("logsearch.config.server.class"); @@ -57,8 +58,9 @@ public class LogSearchConfigFactory { } else { logSearchConfig = defaultClass.newInstance(); } - - logSearchConfig.init(properties); + if (init) { + logSearchConfig.init(properties); + } return logSearchConfig; } catch (Exception e) { LOG.error("Could not initialize logsearch config.", e); @@ -74,12 +76,13 @@ public class LogSearchConfigFactory { * "logsearch.config.class" entry than the class defined there would be used instead of the default class. * @param clusterName The name of the cluster. * @param defaultClass The default configuration class to use if not specified otherwise. + * @param init initialize the properties and zookeeper client * @return The Log Search Configuration instance. * @throws Exception Throws exception if the defined class does not implement LogSearchConfigLogFeeder, or doesn't have an empty * constructor, or throws an exception in it's init method. */ public static LogSearchConfigLogFeeder createLogSearchConfigLogFeeder(Map<String, String> properties, String clusterName, - Class<? extends LogSearchConfigLogFeeder> defaultClass) throws Exception { + Class<? extends LogSearchConfigLogFeeder> defaultClass, boolean init) throws Exception { try { LogSearchConfigLogFeeder logSearchConfig = null; String configClassName = properties.get("logsearch.config.logfeeder.class"); @@ -94,12 +97,46 @@ public class LogSearchConfigFactory { } else { logSearchConfig = defaultClass.newInstance(); } - - logSearchConfig.init(properties, clusterName); + if (init) { + logSearchConfig.init(properties, clusterName); + } return logSearchConfig; } catch (Exception e) { LOG.error("Could not initialize logsearch config.", e); throw e; } } + + /** + * Creates a Log Search Configuration instance for the Log Search Server that implements + * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigServer}. + * + * @param properties The properties of the component for which the configuration is created. If the properties contain the + * "logsearch.config.class" entry than the class defined there would be used instead of the default class. + * @param defaultClass The default configuration class to use if not specified otherwise. + * @return The Log Search Configuration instance. + * @throws Exception Throws exception if the defined class does not implement LogSearchConfigServer, or doesn't have an empty + * constructor, or throws an exception in it's init method. + */ + public static LogSearchConfigServer createLogSearchConfigServer(Map<String, String> properties, + Class<? extends LogSearchConfigServer> defaultClass) throws Exception { + return createLogSearchConfigServer(properties, defaultClass, true); + } + + /** + * Creates a Log Search Configuration instance for the Log Search Server that implements + * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder}. + * + * @param properties The properties of the component for which the configuration is created. If the properties contain the + * "logsearch.config.class" entry than the class defined there would be used instead of the default class. + * @param clusterName The name of the cluster. + * @param defaultClass The default configuration class to use if not specified otherwise. + * @return The Log Search Configuration instance. + * @throws Exception Throws exception if the defined class does not implement LogSearchConfigLogFeeder, or doesn't have an empty + * constructor, or throws an exception in it's init method. + */ + public static LogSearchConfigLogFeeder createLogSearchConfigLogFeeder(Map<String, String> properties, String clusterName, + Class<? extends LogSearchConfigLogFeeder> defaultClass) throws Exception { + return createLogSearchConfigLogFeeder(properties, clusterName, defaultClass, true); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java index 6ed36fd..1387515 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java @@ -68,8 +68,7 @@ public interface LogSearchConfigLogFeeder extends LogSearchConfig { /** * Saves the properties of an Output Solr. - * - * @param type The type of the Output Solr. + * * @param outputConfigMonitors The monitors which want to watch the output config changes. * @throws Exception */ http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore b/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore new file mode 100644 index 0000000..7b00482 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore @@ -0,0 +1 @@ +*.pid \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml index 01710bf..005af15 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml @@ -33,6 +33,8 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <spring.version>4.3.10.RELEASE</spring.version> + <spring-boot.version>1.5.6.RELEASE</spring-boot.version> </properties> <dependencies> @@ -67,11 +69,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.17</version> - </dependency> - <dependency> <groupId>io.thekraken</groupId> <artifactId>grok</artifactId> <version>0.1.4</version> @@ -102,16 +99,6 @@ <version>18.0</version> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.7.20</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.20</version> - </dependency> - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.2</version> @@ -152,6 +139,10 @@ <version>${hadoop.version}</version> <exclusions> <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> </exclusion> @@ -179,6 +170,32 @@ <artifactId>commons-io</artifactId> <version>${common.io.version}</version> </dependency> + <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + <version>1</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter</artifactId> + <version>${spring-boot.version}</version> + <exclusions> + <exclusion> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-log4j</artifactId> + <version>1.3.8.RELEASE</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-configuration-processor</artifactId> + <version>${spring-boot.version}</version> + </dependency> </dependencies> <build> <finalName>LogFeeder</finalName> @@ -225,6 +242,21 @@ </arguments> </configuration> </plugin> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + <version>${spring-boot.version}</version> + <configuration> + <classifier>exec</classifier> + </configuration> + <executions> + <execution> + <goals> + <goal>repackage</goal> + </goals> + </execution> + </executions> + </plugin> <!-- copy-dependencies --> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 5114743..2d31e5a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -16,146 +16,41 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.ambari.logfeeder; +import com.google.gson.GsonBuilder; +import org.apache.ambari.logfeeder.common.LogEntryParseTester; +import org.apache.commons.io.FileUtils; +import org.apache.log4j.LogManager; +import org.springframework.boot.Banner; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.system.ApplicationPidFileWriter; + import java.io.File; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.common.ConfigHandler; -import org.apache.ambari.logfeeder.common.LogEntryParseTester; -import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory; -import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; -import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK; -import org.apache.commons.io.FileUtils; -import org.apache.ambari.logfeeder.input.InputConfigUploader; -import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; -import org.apache.ambari.logfeeder.metrics.MetricData; -import org.apache.ambari.logfeeder.metrics.MetricsManager; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; -import org.apache.ambari.logfeeder.util.SSLUtil; -import com.google.common.collect.Maps; -import com.google.gson.GsonBuilder; - -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - +@SpringBootApplication( + scanBasePackages = {"org.apache.ambari.logfeeder"} +) public class LogFeeder { - private static final Logger LOG = Logger.getLogger(LogFeeder.class); - - private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30; - private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours - - private final LogFeederCommandLine cli; - - private ConfigHandler configHandler; - private LogSearchConfigLogFeeder config; - - private MetricsManager metricsManager = new MetricsManager(); - - private long lastCheckPointCleanedMS = 0; - private Thread statLoggerThread = null; - - private LogFeeder(LogFeederCommandLine cli) { - this.cli = cli; - } - - public void run() { - try { - init(); - monitor(); - } catch (Throwable t) { - LOG.fatal("Caught exception in main.", t); - System.exit(1); - } - } - - private void init() throws Throwable { - long startTime = System.currentTimeMillis(); - - SSLUtil.ensureStorePasswords(); - - config = LogSearchConfigFactory.createLogSearchConfigLogFeeder(Maps.fromProperties(LogFeederPropertiesUtil.getProperties()), - LogFeederPropertiesUtil.getClusterName(), LogSearchConfigLogFeederZK.class); - configHandler = new ConfigHandler(config); - configHandler.init(); - LogLevelFilterHandler.init(config); - InputConfigUploader.load(config); - config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederPropertiesUtil.getClusterName()); - - metricsManager.init(); - - LOG.debug("=============="); - - long endTime = System.currentTimeMillis(); - LOG.info("Took " + (endTime - startTime) + " ms to initialize"); - } - - private class JVMShutdownHook extends Thread { - - public void run() { - try { - LOG.info("Processing is shutting down."); - - configHandler.close(); - config.close(); - logStats(); - - LOG.info("LogSearch is exiting."); - } catch (Throwable t) { - // Ignore - } - } - } - - private void monitor() throws Exception { - JVMShutdownHook logFeederJVMHook = new JVMShutdownHook(); - ShutdownHookManager.get().addShutdownHook(logFeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY); - - statLoggerThread = new Thread("statLogger") { - - @Override - public void run() { - while (true) { - try { - Thread.sleep(30 * 1000); - } catch (Throwable t) { - // Ignore - } - try { - logStats(); - } catch (Throwable t) { - LOG.error("LogStats: Caught exception while logging stats.", t); - } - - if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) { - lastCheckPointCleanedMS = System.currentTimeMillis(); - configHandler.cleanCheckPointFiles(); - } - } - } - }; - statLoggerThread.setDaemon(true); - statLoggerThread.start(); - - } - - private void logStats() { - configHandler.logStats(); - - if (metricsManager.isMetricsEnabled()) { - List<MetricData> metricsList = new ArrayList<MetricData>(); - configHandler.addMetrics(metricsList); - metricsManager.useMetrics(metricsList); + public static void main(String[] args) { + LogFeederCommandLine cli = new LogFeederCommandLine(args); + if (cli.isTest()) { + test(cli); } + String pidFile = System.getenv("PID_FILE") == null ? "logfeeder.pid" : System.getenv("PID_FILE"); + new SpringApplicationBuilder(LogFeeder.class) + .bannerMode(Banner.Mode.OFF) + .listeners(new ApplicationPidFileWriter(pidFile)) + .run(args); } - public void test() { + private static void test(LogFeederCommandLine cli) { try { LogManager.shutdown(); String testLogEntry = cli.getTestLogEntry(); @@ -173,22 +68,4 @@ public class LogFeeder { e.printStackTrace(System.out); } } - - public static void main(String[] args) { - LogFeederCommandLine cli = new LogFeederCommandLine(args); - - LogFeeder logFeeder = new LogFeeder(cli); - - if (cli.isMonitor()) { - try { - LogFeederPropertiesUtil.loadProperties(); - } catch (Throwable t) { - LOG.warn("Could not load logfeeder properites"); - System.exit(1); - } - logFeeder.run(); - } else if (cli.isTest()) { - logFeeder.test(); - } - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java index d996f98..61e7a1e 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java @@ -30,14 +30,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; public class LogFeederCommandLine { private static final Logger LOG = LoggerFactory.getLogger(LogFeederCommandLine.class); - - private static final String MONITOR_COMMAND = "monitor"; private static final String TEST_COMMAND = "test"; private static final String TEST_LOG_ENTRY_OPTION = "test-log-entry"; @@ -60,11 +57,6 @@ public class LogFeederCommandLine { .desc("Print commands") .build(); - Option monitorOption = Option.builder("m") - .longOpt(MONITOR_COMMAND) - .desc("Monitor log files") - .build(); - Option testOption = Option.builder("t") .longOpt(TEST_COMMAND) .desc("Test if log entry is parseable") @@ -95,7 +87,6 @@ public class LogFeederCommandLine { .build(); options.addOption(helpOption); - options.addOption(monitorOption); options.addOption(testOption); options.addOption(testLogEntryOption); options.addOption(testShipperConfOption); @@ -111,21 +102,14 @@ public class LogFeederCommandLine { System.exit(0); } String command = ""; - if (cli.hasOption("m")) { - command = MONITOR_COMMAND; - } else if (cli.hasOption("t")) { + if (cli.hasOption("t")) { command = TEST_COMMAND; validateRequiredOptions(cli, command, testLogEntryOption, testShipperConfOption); } else { - List<String> commands = Arrays.asList(MONITOR_COMMAND, TEST_COMMAND); - helpFormatter.printHelp(COMMAND_LINE_SYNTAX, options); - LOG.error(String.format("One of the supported commands is required (%s)", StringUtils.join(commands, "|"))); - System.exit(1); + LOG.info("Start application in monitor mode "); } } catch (Exception e) { - LOG.error("Error parsing command line parameters", e); - helpFormatter.printHelp(COMMAND_LINE_SYNTAX, options); - System.exit(1); + LOG.info("Error parsing command line parameters: {}. LogFeeder will be started in monitoring mode.", e.getMessage()); } } @@ -142,12 +126,8 @@ public class LogFeederCommandLine { } } - public boolean isMonitor() { - return cli.hasOption('m'); - } - public boolean isTest() { - return cli.hasOption('t'); + return cli != null && cli.hasOption('t'); } public String getTestLogEntry() { http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 243b344..35c0e6a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputManager; @@ -47,7 +49,6 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; @@ -62,16 +63,24 @@ import org.apache.log4j.Logger; import com.google.gson.reflect.TypeToken; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; + public class ConfigHandler implements InputConfigMonitor { private static final Logger LOG = Logger.getLogger(ConfigHandler.class); private final LogSearchConfigLogFeeder logSearchConfig; - - private final OutputManager outputManager = new OutputManager(); - private final InputManager inputManager = new InputManager(); + + @Inject + private InputManager inputManager; + @Inject + private OutputManager outputManager; + @Inject + private LogFeederProps logFeederProps; private final Map<String, Object> globalConfigs = new HashMap<>(); - private final List<String> globalConfigJsons = new ArrayList<String>(); + private final List<String> globalConfigJsons = new ArrayList<>(); private final List<InputDescriptor> inputConfigList = new ArrayList<>(); private final List<FilterDescriptor> filterConfigList = new ArrayList<>(); @@ -82,9 +91,11 @@ public class ConfigHandler implements InputConfigMonitor { public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) { this.logSearchConfig = logSearchConfig; } - + + @PostConstruct public void init() throws Exception { loadConfigFiles(); + logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), logFeederProps.getClusterName()); loadOutputs(); simulateIfNeeded(); @@ -114,7 +125,7 @@ public class ConfigHandler implements InputConfigMonitor { private List<String> getConfigFiles() { List<String> configFiles = new ArrayList<>(); - String logFeederConfigFilesProperty = LogFeederPropertiesUtil.getConfigFiles(); + String logFeederConfigFilesProperty = logFeederProps.getConfigFiles(); LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty); if (logFeederConfigFilesProperty != null) { configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(","))); @@ -217,7 +228,7 @@ public class ConfigHandler implements InputConfigMonitor { } private void simulateIfNeeded() throws Exception { - int simulatedInputNumber = LogFeederPropertiesUtil.getSimulateInputNumber(); + int simulatedInputNumber = logFeederProps.getInputSimulateConfig().getSimulateInputNumber(); if (simulatedInputNumber == 0) return; @@ -347,18 +358,15 @@ public class ConfigHandler implements InputConfigMonitor { } private void sortFilters() { - Collections.sort(filterConfigList, new Comparator<FilterDescriptor>() { - @Override - public int compare(FilterDescriptor o1, FilterDescriptor o2) { - Integer o1Sort = o1.getSortOrder(); - Integer o2Sort = o2.getSortOrder(); - if (o1Sort == null || o2Sort == null) { - return 0; - } - - return o1Sort - o2Sort; + Collections.sort(filterConfigList, (o1, o2) -> { + Integer o1Sort = o1.getSortOrder(); + Integer o2Sort = o2.getSortOrder(); + if (o1Sort == null || o2Sort == null) { + return 0; } - } ); + + return o1Sort - o2Sort; + }); } private void assignOutputsToInputs(String serviceName) { @@ -428,6 +436,7 @@ public class ConfigHandler implements InputConfigMonitor { outputManager.addMetricsContainers(metricsList); } + @PreDestroy public void close() { inputManager.close(); outputManager.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java index 5c20a8e..30bd9fd 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java @@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.common; import java.util.List; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; @@ -30,6 +31,7 @@ public abstract class ConfigItem { protected static final Logger LOG = Logger.getLogger(ConfigBlock.class); private boolean drain = false; + private LogFeederProps logFeederProps; public MetricData statMetric = new MetricData(getStatMetricName(), false); public ConfigItem() { @@ -59,7 +61,8 @@ public abstract class ConfigItem { /** * This method needs to be overwritten by deriving classes. */ - public void init() throws Exception { + public void init(LogFeederProps logFeederProps) throws Exception { + this.logFeederProps = logFeederProps; } public abstract boolean isEnabled(); @@ -94,4 +97,7 @@ public abstract class ConfigItem { this.drain = drain; } + public LogFeederProps getLogFeederProps() { + return logFeederProps; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java index ec29f69..1a701e1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.Output; @@ -76,7 +77,7 @@ public class LogEntryParseTester { ConfigHandler configHandler = new ConfigHandler(null); Input input = configHandler.getTestInput(inputConfig, logId); final Map<String, Object> result = new HashMap<>(); - input.getFirstFilter().init(); + input.getFirstFilter().init(new LogFeederProps()); input.addOutput(new Output() { @Override public void write(String block, InputMarker inputMarker) throws Exception { http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index a7cccc6..b241831 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -38,4 +38,54 @@ public class LogFeederConstants { public static final String S3_PATH_SEPARATOR = "/"; public static final String IN_MEMORY_TIMESTAMP = "in_memory_timestamp"; + + public static final String LOGFEEDER_PROPERTIES_FILE = "logfeeder.properties"; + public static final String CLUSTER_NAME_PROPERTY = "cluster.name"; + public static final String TMP_DIR_PROPERTY = "logfeeder.tmp.dir"; + + public static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol"; + public static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port"; + public static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts"; + public static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path"; + + public static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable"; + public static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level"; + + public static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir"; + public static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files"; + + public static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number"; + public static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0; + public static final String SIMULATE_LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level"; + public static final String DEFAULT_SIMULATE_LOG_LEVEL = "WARN"; + public static final String SIMULATE_NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words"; + public static final int DEFAULT_SIMULATE_NUMBER_OF_WORDS = 1000; + public static final String SIMULATE_MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words"; + public static final int DEFAULT_SIMULATE_MIN_LOG_WORDS = 5; + public static final String SIMULATE_MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words"; + public static final int DEFAULT_SIMULATE_MAX_LOG_WORDS = 5; + public static final String SIMULATE_SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds"; + public static final int DEFAULT_SIMULATE_SLEEP_MILLISECONDS = 10000; + public static final String SIMULATE_LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids"; + + public static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable"; + public static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false; + public static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab"; + public static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file"; + + public static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled"; + public static final boolean DEFAULT_CACHE_ENABLED = false; + public static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field"; + public static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; + public static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size"; + public static final int DEFAULT_CACHE_SIZE = 100; + public static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled"; + public static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false; + public static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval"; + public static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; + + public static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder"; + public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension"; + public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; + } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java new file mode 100644 index 0000000..cfb6c78 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +import com.google.common.collect.Maps; +import org.apache.ambari.logfeeder.common.ConfigHandler; +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.input.InputConfigUploader; +import org.apache.ambari.logfeeder.input.InputManager; +import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; +import org.apache.ambari.logfeeder.metrics.MetricsManager; +import org.apache.ambari.logfeeder.metrics.StatsLogger; +import org.apache.ambari.logfeeder.output.OutputManager; +import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory; +import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; +import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import org.springframework.context.annotation.PropertySource; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; + +import javax.inject.Inject; + +@Configuration +@PropertySource(value = { + "classpath:" + LogFeederConstants.LOGFEEDER_PROPERTIES_FILE +}) +public class ApplicationConfig { + + @Inject + private LogFeederProps logFeederProps; + + @Bean + public static PropertySourcesPlaceholderConfigurer propertyConfigurer() { + return new PropertySourcesPlaceholderConfigurer(); + } + + @Bean + public LogFeederSecurityConfig logFeederSecurityConfig() { + return new LogFeederSecurityConfig(); + } + + @Bean + @DependsOn("logSearchConfigLogFeeder") + public ConfigHandler configHandler() throws Exception { + return new ConfigHandler(logSearchConfigLogFeeder()); + } + + @Bean + @DependsOn("logFeederSecurityConfig") + public LogSearchConfigLogFeeder logSearchConfigLogFeeder() throws Exception { + return LogSearchConfigFactory.createLogSearchConfigLogFeeder( + Maps.fromProperties(logFeederProps.getProperties()), + logFeederProps.getClusterName(), + LogSearchConfigLogFeederZK.class,false); + } + + @Bean + public MetricsManager metricsManager() { + return new MetricsManager(); + } + + @Bean + @DependsOn("configHandler") + public LogLevelFilterHandler logLevelFilterHandler() throws Exception { + return new LogLevelFilterHandler(logSearchConfigLogFeeder()); + } + + @Bean + @DependsOn({"configHandler", "logSearchConfigLogFeeder", "logLevelFilterHandler"}) + public InputConfigUploader inputConfigUploader() { + return new InputConfigUploader(); + } + + @Bean + @DependsOn("inputConfigUploader") + public StatsLogger statsLogger() { + return new StatsLogger(); + } + + @Bean + public InputManager inputManager() { + return new InputManager(); + } + + @Bean + public OutputManager outputManager() { + return new OutputManager(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java new file mode 100644 index 0000000..cf087f9 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class InputSimulateConfig { + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_INPUT_NUMBER_PROPERTY, + description = "The number of the simulator instances to run with. O means no simulation.", + examples = {"10"}, + defaultValue = LogFeederConstants.DEFAULT_SIMULATE_INPUT_NUMBER + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.SIMULATE_INPUT_NUMBER_PROPERTY + ":0}") + private Integer simulateInputNumber; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_LOG_LEVEL_PROPERTY, + description = "The log level to create the simulated log entries with.", + examples = {"INFO"}, + defaultValue = LogFeederConstants.DEFAULT_SIMULATE_LOG_LEVEL, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.SIMULATE_LOG_LEVEL_PROPERTY + ":"+ LogFeederConstants.DEFAULT_SIMULATE_LOG_LEVEL + "}") + private String simulateLogLevel; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_NUMBER_OF_WORDS_PROPERTY, + description = "The size of the set of words that may be used to create the simulated log entries with.", + examples = {"100"}, + defaultValue = LogFeederConstants.DEFAULT_SIMULATE_NUMBER_OF_WORDS + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SIMULATE_NUMBER_OF_WORDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_NUMBER_OF_WORDS + "}") + private Integer simulateNumberOfWords; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_MIN_LOG_WORDS_PROPERTY, + description = "The minimum number of words in a simulated log entry.", + examples = {"3"}, + defaultValue = LogFeederConstants.DEFAULT_SIMULATE_MIN_LOG_WORDS + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SIMULATE_MIN_LOG_WORDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_MIN_LOG_WORDS + "}") + private Integer simulateMinLogWords; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_MAX_LOG_WORDS_PROPERTY, + description = "The maximum number of words in a simulated log entry.", + examples = {"8"}, + defaultValue = LogFeederConstants.DEFAULT_SIMULATE_MAX_LOG_WORDS + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SIMULATE_MAX_LOG_WORDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_MAX_LOG_WORDS + "}") + private Integer simulateMaxLogWords; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_SLEEP_MILLISECONDS_PROPERTY, + description = "The milliseconds to sleep between creating two simulated log entries.", + examples = {"5000"}, + defaultValue = LogFeederConstants.DEFAULT_SIMULATE_SLEEP_MILLISECONDS + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SIMULATE_SLEEP_MILLISECONDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_SLEEP_MILLISECONDS + "}") + private Integer simulateSleepMilliseconds; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SIMULATE_LOG_IDS_PROPERTY, + description = "The comma separated list of log ids for which to create the simulated log entries.", + examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"}, + defaultValue = "The log ids of the installed services in the cluster", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SIMULATE_LOG_IDS_PROPERTY + ":}") + private String simulateLogIds; + + public Integer getSimulateInputNumber() { + return simulateInputNumber; + } + + public void setSimulateInputNumber(Integer simulateInputNumber) { + this.simulateInputNumber = simulateInputNumber; + } + + public String getSimulateLogLevel() { + return simulateLogLevel; + } + + public void setSimulateLogLevel(String simulateLogLevel) { + this.simulateLogLevel = simulateLogLevel; + } + + public Integer getSimulateNumberOfWords() { + return simulateNumberOfWords; + } + + public void setSimulateNumberOfWords(Integer simulateNumberOfWords) { + this.simulateNumberOfWords = simulateNumberOfWords; + } + + public Integer getSimulateMinLogWords() { + return simulateMinLogWords; + } + + public void setSimulateMinLogWords(Integer simulateMinLogWords) { + this.simulateMinLogWords = simulateMinLogWords; + } + + public Integer getSimulateMaxLogWords() { + return simulateMaxLogWords; + } + + public void setSimulateMaxLogWords(Integer simulateMaxLogWords) { + this.simulateMaxLogWords = simulateMaxLogWords; + } + + public Integer getSimulateSleepMilliseconds() { + return simulateSleepMilliseconds; + } + + public void setSimulateSleepMilliseconds(Integer simulateSleepMilliseconds) { + this.simulateSleepMilliseconds = simulateSleepMilliseconds; + } + + public String getSimulateLogIds() { + return simulateLogIds; + } + + public void setSimulateLogIds(String simulateLogIds) { + this.simulateLogIds = simulateLogIds; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java new file mode 100644 index 0000000..353bdc1 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class LogEntryCacheConfig { + + @LogSearchPropertyDescription( + name = LogFeederConstants.CACHE_ENABLED_PROPERTY, + description = "Enables the usage of a cache to avoid duplications.", + examples = {"true"}, + defaultValue = LogFeederConstants.DEFAULT_CACHE_ENABLED + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CACHE_ENABLED_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_ENABLED + "}") + private boolean cacheEnabled; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CACHE_KEY_FIELD_PROPERTY, + description = "The field which's value should be cached and should be checked for repetitions.", + examples = {"some_field_prone_to_repeating_value"}, + defaultValue = LogFeederConstants.DEFAULT_CACHE_KEY_FIELD, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CACHE_KEY_FIELD_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_KEY_FIELD + "}") + private String cacheKeyField; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CACHE_SIZE_PROPERTY, + description = "The number of log entries to cache in order to avoid duplications.", + examples = {"50"}, + defaultValue = LogFeederConstants.DEFAULT_CACHE_SIZE + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CACHE_SIZE_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_SIZE + "}") + private Integer cacheSize; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CACHE_LAST_DEDUP_ENABLED_PROPERTY, + description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.", + examples = {"true"}, + defaultValue = LogFeederConstants.DEFAULT_CACHE_LAST_DEDUP_ENABLED + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CACHE_LAST_DEDUP_ENABLED_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_LAST_DEDUP_ENABLED + "}") + private Boolean cacheLastDedupEnabled; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CACHE_DEDUP_INTERVAL_PROPERTY, + description = "Maximum number of milliseconds between two identical messages to be filtered out.", + examples = {"500"}, + defaultValue = LogFeederConstants.DEFAULT_CACHE_DEDUP_INTERVAL + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CACHE_DEDUP_INTERVAL_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_DEDUP_INTERVAL + "}") + private String cacheDedupInterval; + + public boolean isCacheEnabled() { + return cacheEnabled; + } + + public void setCacheEnabled(boolean cacheEnabled) { + this.cacheEnabled = cacheEnabled; + } + + public String getCacheKeyField() { + return cacheKeyField; + } + + public void setCacheKeyField(String cacheKeyField) { + this.cacheKeyField = cacheKeyField; + } + + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } + + public boolean isCacheLastDedupEnabled() { + return cacheLastDedupEnabled; + } + + public void setCacheLastDedupEnabled(boolean cacheLastDedupEnabled) { + this.cacheLastDedupEnabled = cacheLastDedupEnabled; + } + + public String getCacheDedupInterval() { + return cacheDedupInterval; + } + + public void setCacheDedupInterval(String cacheDedupInterval) { + this.cacheDedupInterval = cacheDedupInterval; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java new file mode 100644 index 0000000..367d1cd --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.AbstractEnvironment; +import org.springframework.core.env.Environment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.io.support.ResourcePropertySource; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.stream.Stream; + +@Configuration +public class LogFeederProps { + + @Inject + private Environment env; + + private Properties properties; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CLUSTER_NAME_PROPERTY, + description = "The name of the cluster the Log Feeder program runs in.", + examples = {"cl1"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CLUSTER_NAME_PROPERTY + "}") + private String clusterName; + + @LogSearchPropertyDescription( + name = LogFeederConstants.TMP_DIR_PROPERTY, + description = "The tmp dir used for creating temporary files.", + examples = {"/tmp/"}, + defaultValue = "java.io.tmpdir", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.TMP_DIR_PROPERTY + ":#{systemProperties['java.io.tmpdir']}}") + private String tmpDir; + + @LogSearchPropertyDescription( + name = LogFeederConstants.LOG_FILTER_ENABLE_PROPERTY, + description = "Enables the filtering of the log entries by log level filters.", + examples = {"true"}, + defaultValue = "false", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.LOG_FILTER_ENABLE_PROPERTY + "}") + private boolean logLevelFilterEnabled; + + @LogSearchPropertyDescription( + name = LogFeederConstants.INCLUDE_DEFAULT_LEVEL_PROPERTY, + description = "Comma separated list of the default log levels to be enabled by the filtering.", + examples = {"FATAL,ERROR,WARN"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("#{'${" + LogFeederConstants.INCLUDE_DEFAULT_LEVEL_PROPERTY + ":}'.split(',')}") + private List<String> includeDefaultLogLevels; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CONFIG_DIR_PROPERTY, + description = "The directory where shipper configuration files are looked for.", + examples = {"/etc/ambari-logsearch-logfeeder/conf"}, + defaultValue = "etc/ambari-logsearch-logfeeder/conf", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.CONFIG_DIR_PROPERTY + ":/etc/ambari-logsearch-logfeeder/conf}") + private String confDir; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CONFIG_FILES_PROPERTY, + description = "Comma separated list of the config files containing global / output configurations.", + examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.CONFIG_FILES_PROPERTY + ":}") + private String configFiles; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CHECKPOINT_EXTENSION_PROPERTY, + description = "The extension used for checkpoint files.", + examples = {"ckp"}, + defaultValue = LogFeederConstants.DEFAULT_CHECKPOINT_EXTENSION, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CHECKPOINT_EXTENSION_PROPERTY + ":" + LogFeederConstants.DEFAULT_CHECKPOINT_EXTENSION + "}") + private String checkPointExtension; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY, + description = "The folder where checkpoint files are stored.", + examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY + ":/etc/ambari-logsearch-logfeeder/conf/checkpoints}") + public String checkpointFolder; + + @Inject + private LogEntryCacheConfig logEntryCacheConfig; + + @Inject + private InputSimulateConfig inputSimulateConfig; + + @Inject + private LogFeederSecurityConfig logFeederSecurityConfig; + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public Properties getProperties() { + return properties; + } + + public String getTmpDir() { + return tmpDir; + } + + public boolean isLogLevelFilterEnabled() { + return logLevelFilterEnabled; + } + + public List<String> getIncludeDefaultLogLevels() { + return includeDefaultLogLevels; + } + + public String getConfDir() { + return confDir; + } + + public void setConfDir(String confDir) { + this.confDir = confDir; + } + + public String getConfigFiles() { + return configFiles; + } + + public void setConfigFiles(String configFiles) { + this.configFiles = configFiles; + } + + public LogEntryCacheConfig getLogEntryCacheConfig() { + return logEntryCacheConfig; + } + + public void setLogEntryCacheConfig(LogEntryCacheConfig logEntryCacheConfig) { + this.logEntryCacheConfig = logEntryCacheConfig; + } + + public InputSimulateConfig getInputSimulateConfig() { + return inputSimulateConfig; + } + + public void setInputSimulateConfig(InputSimulateConfig inputSimulateConfig) { + this.inputSimulateConfig = inputSimulateConfig; + } + + public LogFeederSecurityConfig getLogFeederSecurityConfig() { + return logFeederSecurityConfig; + } + + public void setLogFeederSecurityConfig(LogFeederSecurityConfig logFeederSecurityConfig) { + this.logFeederSecurityConfig = logFeederSecurityConfig; + } + + public String getCheckPointExtension() { + return checkPointExtension; + } + + public void setCheckPointExtension(String checkPointExtension) { + this.checkPointExtension = checkPointExtension; + } + + public String getCheckpointFolder() { + return checkpointFolder; + } + + public void setCheckpointFolder(String checkpointFolder) { + this.checkpointFolder = checkpointFolder; + } + + @PostConstruct + public void init() { + properties = new Properties(); + MutablePropertySources propSrcs = ((AbstractEnvironment) env).getPropertySources(); + ResourcePropertySource propertySource = (ResourcePropertySource) propSrcs.get("class path resource [" + + LogFeederConstants.LOGFEEDER_PROPERTIES_FILE + "]"); + if (propertySource != null) { + Stream.of(propertySource) + .map(MapPropertySource::getPropertyNames) + .flatMap(Arrays::<String>stream) + .forEach(propName -> properties.setProperty(propName, env.getProperty(propName))); + } else { + throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath"); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java new file mode 100644 index 0000000..8a45753 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import java.io.File; +import java.nio.charset.Charset; + +public class LogFeederSecurityConfig { + + private static final Logger LOG = LoggerFactory.getLogger(LogFeederSecurityConfig.class); + + private static final String KEYSTORE_LOCATION_ARG = "javax.net.ssl.keyStore"; + private static final String TRUSTSTORE_LOCATION_ARG = "javax.net.ssl.trustStore"; + private static final String KEYSTORE_TYPE_ARG = "javax.net.ssl.keyStoreType"; + private static final String TRUSTSTORE_TYPE_ARG = "javax.net.ssl.trustStoreType"; + private static final String KEYSTORE_PASSWORD_ARG = "javax.net.ssl.keyStorePassword"; + private static final String TRUSTSTORE_PASSWORD_ARG = "javax.net.ssl.trustStorePassword"; + private static final String KEYSTORE_PASSWORD_PROPERTY_NAME = "logfeeder_keystore_password"; + private static final String TRUSTSTORE_PASSWORD_PROPERTY_NAME = "logfeeder_truststore_password"; + private static final String KEYSTORE_PASSWORD_FILE = "ks_pass.txt"; + private static final String TRUSTSTORE_PASSWORD_FILE = "ts_pass.txt"; + + private static final String LOGFEEDER_CERT_DEFAULT_FOLDER = "/etc/ambari-logsearch-portal/conf/keys"; + private static final String LOGFEEDER_STORE_DEFAULT_PASSWORD = "bigdata"; + + private static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = "hadoop.security.credential.provider.path"; + + @LogSearchPropertyDescription( + name = CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, + description = "The jceks file that provides passwords.", + examples = {"jceks://file/etc/ambari-logsearch-logfeeder/conf/logfeeder.jceks"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY + ":}") + private String credentialStoreProviderPath; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SOLR_JAAS_FILE_PROPERTY, + description = "The jaas file used for solr.", + examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"}, + defaultValue = LogFeederConstants.DEFAULT_SOLR_JAAS_FILE, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SOLR_JAAS_FILE_PROPERTY + ":" + LogFeederConstants.DEFAULT_SOLR_JAAS_FILE + "}") + private String solrJaasFile; + + @LogSearchPropertyDescription( + name = LogFeederConstants.SOLR_KERBEROS_ENABLE_PROPERTY, + description = "Enables using kerberos for accessing solr.", + examples = {"true"}, + defaultValue = LogFeederConstants.DEFAULT_SOLR_KERBEROS_ENABLE + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.SOLR_KERBEROS_ENABLE_PROPERTY + ":" + LogFeederConstants.DEFAULT_SOLR_KERBEROS_ENABLE + "}") + private Boolean solrKerberosEnabled; + + public String getKeyStoreLocation() { + return System.getProperty(KEYSTORE_LOCATION_ARG); + } + + public String getKeyStoreType() { + return System.getProperty(KEYSTORE_TYPE_ARG); + } + + public String getKeyStorePassword() { + return System.getProperty(KEYSTORE_PASSWORD_ARG); + } + + public String getTrustStoreLocation() { + return System.getProperty(TRUSTSTORE_LOCATION_ARG); + } + + public String getTrustStoreType() { + return System.getProperty(TRUSTSTORE_TYPE_ARG); + } + + public String getTrustStorePassword() { + return System.getProperty(TRUSTSTORE_PASSWORD_ARG); + } + + public String getCredentialStoreProviderPath() { + return credentialStoreProviderPath; + } + + public void setCredentialStoreProviderPath(String credentialStoreProviderPath) { + this.credentialStoreProviderPath = credentialStoreProviderPath; + } + + public String getSolrJaasFile() { + return solrJaasFile; + } + + public void setSolrJaasFile(String solrJaasFile) { + this.solrJaasFile = solrJaasFile; + } + + public boolean isSolrKerberosEnabled() { + return solrKerberosEnabled; + } + + public void setSolrKerberosEnabled(Boolean solrKerberosEnabled) { + this.solrKerberosEnabled = solrKerberosEnabled; + } + + @PostConstruct + public void ensureStorePasswords() { + ensureStorePassword(KEYSTORE_LOCATION_ARG, KEYSTORE_PASSWORD_ARG, KEYSTORE_PASSWORD_PROPERTY_NAME, KEYSTORE_PASSWORD_FILE); + ensureStorePassword(TRUSTSTORE_LOCATION_ARG, TRUSTSTORE_PASSWORD_ARG, TRUSTSTORE_PASSWORD_PROPERTY_NAME, TRUSTSTORE_PASSWORD_FILE); + } + + private void ensureStorePassword(String locationArg, String pwdArg, String propertyName, String fileName) { + if (StringUtils.isNotEmpty(System.getProperty(locationArg)) && StringUtils.isEmpty(System.getProperty(pwdArg))) { + String password = getPassword(propertyName, fileName); + System.setProperty(pwdArg, password); + } + } + + private String getPassword(String propertyName, String fileName) { + String credentialStorePassword = getPasswordFromCredentialStore(propertyName); + if (credentialStorePassword != null) { + return credentialStorePassword; + } + + String filePassword = getPasswordFromFile(fileName); + if (filePassword != null) { + return filePassword; + } + + return LOGFEEDER_STORE_DEFAULT_PASSWORD; + } + + private String getPasswordFromCredentialStore(String propertyName) { + try { + if (StringUtils.isEmpty(credentialStoreProviderPath)) { + return null; + } + + org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); + config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, credentialStoreProviderPath); + char[] passwordChars = config.getPassword(propertyName); + return (ArrayUtils.isNotEmpty(passwordChars)) ? new String(passwordChars) : null; + } catch (Exception e) { + LOG.warn(String.format("Could not load password %s from credential store, using default password", propertyName)); + return null; + } + } + + private String getPasswordFromFile(String fileName) { + try { + File pwdFile = new File(LOGFEEDER_CERT_DEFAULT_FOLDER, fileName); + if (!pwdFile.exists()) { + FileUtils.writeStringToFile(pwdFile, LOGFEEDER_STORE_DEFAULT_PASSWORD, Charset.defaultCharset()); + return LOGFEEDER_STORE_DEFAULT_PASSWORD; + } else { + return FileUtils.readFileToString(pwdFile, Charset.defaultCharset()); + } + } catch (Exception e) { + LOG.warn("Exception occurred during read/write password file for keystore/truststore.", e); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java new file mode 100644 index 0000000..4b3c6fb --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +import com.google.common.base.Splitter; +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.util.List; + +@Configuration +public class MetricsCollectorConfig { + + @LogSearchPropertyDescription( + name = LogFeederConstants.METRICS_COLLECTOR_HOSTS_PROPERTY, + description = "Comma separtaed list of metric collector hosts.", + examples = {"c6401.ambari.apache.org,c6402.ambari.apache.org"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.METRICS_COLLECTOR_HOSTS_PROPERTY + ":}") + private String hostsString; + + private List<String> hosts; + + @LogSearchPropertyDescription( + name = LogFeederConstants.METRICS_COLLECTOR_PROTOCOL_PROPERTY, + description = "The protocol used by metric collectors.", + examples = {"http", "https"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.METRICS_COLLECTOR_PROTOCOL_PROPERTY + ":#{NULL}}") + private String protocol; + + @LogSearchPropertyDescription( + name = LogFeederConstants.METRICS_COLLECTOR_PORT_PROPERTY, + description = "The port used by metric collectors.", + examples = {"6188"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.METRICS_COLLECTOR_PORT_PROPERTY + ":#{NULL}}") + private String port; + + @LogSearchPropertyDescription( + name = LogFeederConstants.METRICS_COLLECTOR_PATH_PROPERTY, + description = "The path used by metric collectors.", + examples = {"/ws/v1/timeline/metrics"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.METRICS_COLLECTOR_PATH_PROPERTY + ":#{NULL}}") + private String path; + + public List<String> getHosts() { + return hosts; + } + + public void setHosts(List<String> hosts) { + this.hosts = hosts; + } + + public String getProtocol() { + return protocol; + } + + public String getPort() { + return port; + } + + public void setPort(String port) { + this.port = port; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getHostsString() { + return hostsString; + } + + @PostConstruct + public void init() { + if (StringUtils.isNotBlank(hostsString)) { + hosts = Splitter.on(',').splitToList(hostsString); + } else { + hosts = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index 8e8834b..a06b348 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.ambari.logfeeder.common.ConfigItem; import org.apache.ambari.logfeeder.common.LogFeederException; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.mapper.Mapper; @@ -56,12 +57,12 @@ public abstract class Filter extends ConfigItem { } @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); initializePostMapValues(); if (nextFilter != null) { - nextFilter.init(); + nextFilter.init(logFeederProps); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index fc7a565..f0ef31b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -35,6 +35,7 @@ import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.exception.GrokException; import org.apache.ambari.logfeeder.common.LogFeederException; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -73,13 +74,13 @@ public class FilterGrok extends Filter { private MetricData grokErrorMetric = new MetricData("filter.error.grok", false); @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); try { messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern()); multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern()); - sourceField = ((FilterGrokDescriptor)filterDescriptor).getSourceField(); + sourceField = filterDescriptor.getSourceField(); removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField); LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " + http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java index 8e5aee8..adcf0a4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.regex.Pattern; import org.apache.ambari.logfeeder.common.LogFeederException; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -41,8 +42,8 @@ public class FilterKeyValue extends Filter { private MetricData errorMetric = new MetricData("filter.error.keyvalue", false); @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); sourceField = filterDescriptor.getSourceField(); valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit); http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index b021c37..cf295c5 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -28,8 +28,8 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.util.FileUtil; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; import org.apache.commons.lang.ObjectUtils; @@ -55,6 +55,8 @@ public abstract class AbstractInputFile extends Input { private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>(); private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>(); + private LogFeederProps logFeederProps; + @Override protected String getStatMetricName() { return "input.files.read_lines"; @@ -66,10 +68,11 @@ public abstract class AbstractInputFile extends Input { } @Override - public void init() throws Exception { + public void init(LogFeederProps logFeederProps) throws Exception { + this.logFeederProps = logFeederProps; LOG.info("init() called"); - checkPointExtension = LogFeederPropertiesUtil.getCheckPointExtension(); + checkPointExtension = logFeederProps.getCheckPointExtension(); // Let's close the file and set it to true after we start monitoring it setClosed(true); @@ -86,7 +89,7 @@ public abstract class AbstractInputFile extends Input { LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady); - super.init(); + super.init(logFeederProps); } protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException { http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 972011d..7b9dcd4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.cache.LRUCache; import org.apache.ambari.logfeeder.common.ConfigItem; import org.apache.ambari.logfeeder.common.LogFeederException; @@ -31,7 +33,6 @@ import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions; import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; @@ -49,7 +50,7 @@ public abstract class Input extends ConfigItem implements Runnable { protected InputManager inputManager; protected OutputManager outputManager; - private List<Output> outputList = new ArrayList<Output>(); + private List<Output> outputList = new ArrayList<>(); private Thread thread; private String type; @@ -128,15 +129,15 @@ public abstract class Input extends ConfigItem implements Runnable { } @Override - public void init() throws Exception { - super.init(); - initCache(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); + initCache(logFeederProps.getLogEntryCacheConfig()); tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL); useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5); genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5); if (firstFilter != null) { - firstFilter.init(); + firstFilter.init(logFeederProps); } } @@ -239,28 +240,28 @@ public abstract class Input extends ConfigItem implements Runnable { } } - private void initCache() { + private void initCache(LogEntryCacheConfig cacheConfig) { boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null ? inputDescriptor.isCacheEnabled() - : LogFeederPropertiesUtil.isCacheEnabled(); + : cacheConfig.isCacheEnabled(); if (cacheEnabled) { String cacheKeyField = inputDescriptor.getCacheKeyField() != null ? inputDescriptor.getCacheKeyField() - : LogFeederPropertiesUtil.getCacheKeyField(); + : cacheConfig.getCacheKeyField(); setCacheKeyField(cacheKeyField); int cacheSize = inputDescriptor.getCacheSize() != null ? inputDescriptor.getCacheSize() - : LogFeederPropertiesUtil.getCacheSize(); + : cacheConfig.getCacheSize(); boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null ? inputDescriptor.getCacheLastDedupEnabled() - : LogFeederPropertiesUtil.isCacheLastDedupEnabled(); + : cacheConfig.isCacheLastDedupEnabled(); long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null ? inputDescriptor.getCacheDedupInterval() - : Long.parseLong(LogFeederPropertiesUtil.getCacheDedupInterval()); + : Long.parseLong(cacheConfig.getCacheDedupInterval()); setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled)); }