AMBARI-21399 Create property descriptions for internal Log Feeder configs (mgergely)
Change-Id: I51bf4322184da06084c1b2af35fedd6ee19ab36e Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5c9bdbfe Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5c9bdbfe Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5c9bdbfe Branch: refs/heads/branch-feature-AMBARI-14714 Commit: 5c9bdbfeff7b5b78adf4d840fbd93c504f129830 Parents: 8d9fd45 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Thu Jul 6 11:08:48 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Thu Jul 6 11:08:48 2017 +0200 ---------------------------------------------------------------------- .../logsearch/config/api/LogSearchConfig.java | 7 +- .../config/api/LogSearchConfigFactory.java | 8 +- .../config/api/LogSearchConfigClass1.java | 6 +- .../config/api/LogSearchConfigClass2.java | 6 +- .../config/api/LogSearchConfigFactoryTest.java | 6 +- .../config/zookeeper/LogSearchConfigZK.java | 25 ++----- .../org/apache/ambari/logfeeder/LogFeeder.java | 34 ++------- .../ambari/logfeeder/common/ConfigHandler.java | 52 ++++++------- .../logfeeder/common/LogFeederException.java | 31 ++++++++ .../logfeeder/common/LogfeederException.java | 31 -------- .../apache/ambari/logfeeder/filter/Filter.java | 6 +- .../ambari/logfeeder/filter/FilterGrok.java | 10 +-- .../ambari/logfeeder/filter/FilterJSON.java | 6 +- .../ambari/logfeeder/filter/FilterKeyValue.java | 6 +- .../logfeeder/input/AbstractInputFile.java | 3 +- .../apache/ambari/logfeeder/input/Input.java | 72 +++++++++++++++--- .../logfeeder/input/InputConfigUploader.java | 18 ++++- .../ambari/logfeeder/input/InputManager.java | 45 +++++++----- .../ambari/logfeeder/input/InputSimulate.java | 74 +++++++++++++++++-- .../loglevelfilter/LogLevelFilterHandler.java | 28 ++++++- .../logfeeder/metrics/LogFeederAMSClient.java | 43 ++++++++++- .../logfeeder/metrics/MetricsManager.java | 25 +------ .../ambari/logfeeder/output/OutputHDFSFile.java | 10 +-- .../ambari/logfeeder/output/OutputS3File.java | 2 +- .../ambari/logfeeder/output/OutputSolr.java | 28 ++++++- .../apache/ambari/logfeeder/util/FileUtil.java | 15 ---- .../logfeeder/util/LogFeederHDFSUtil.java | 77 ++++++++++++++++++++ .../ambari/logfeeder/util/LogFeederUtil.java | 41 +++++++++-- .../logfeeder/util/LogfeederHDFSUtil.java | 77 -------------------- .../apache/ambari/logfeeder/util/SSLUtil.java | 9 +++ .../ambari/logfeeder/filter/FilterJSONTest.java | 8 +- .../logfeeder/input/InputManagerTest.java | 26 ------- .../logfeeder/metrics/MetricsManagerTest.java | 2 - .../src/test/resources/logfeeder.properties | 1 - .../configurer/LogSearchConfigConfigurer.java | 2 +- 35 files changed, 500 insertions(+), 340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java index ad1f5d4..6c5cefd 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java @@ -43,9 +43,10 @@ public interface LogSearchConfig extends Closeable { * * @param component The component which will use the configuration. * @param properties The properties of that component. + * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode). * @throws Exception */ - void init(Component component, Map<String, String> properties) throws Exception; + void init(Component component, Map<String, String> properties, String clusterName) throws Exception; /** * Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode. @@ -134,7 +135,9 @@ public interface LogSearchConfig extends Closeable { * * @param inputConfigMonitor The input config monitor to call in case of an input config change. * @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change. + * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode). * @throws Exception */ - void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) throws Exception; + void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, + String clusterName) throws Exception; } http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java index 947e7e7..77b48eb 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java @@ -37,12 +37,13 @@ public class LogSearchConfigFactory { * @param component The component of the Log Search Service to create the configuration for (SERVER/LOGFEEDER). * @param properties The properties of the component for which the configuration is created. If the properties contain the * "logsearch.config.class" entry than the class defined there would be used instead of the default class. + * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode). * @param defaultClass The default configuration class to use if not specified otherwise. * @return The Log Search Configuration instance. * @throws Exception Throws exception if the defined class does not implement LogSearchConfig, or doesn't have an empty * constructor, or throws an exception in it's init method. */ - public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties, + public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties, String clusterName, Class<? extends LogSearchConfig> defaultClass) throws Exception { try { LogSearchConfig logSearchConfig = null; @@ -52,13 +53,14 @@ public class LogSearchConfigFactory { if (LogSearchConfig.class.isAssignableFrom(clazz)) { logSearchConfig = (LogSearchConfig) clazz.newInstance(); } else { - throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + LogSearchConfig.class.getName()); + throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + + LogSearchConfig.class.getName()); } } else { logSearchConfig = defaultClass.newInstance(); } - logSearchConfig.init(component, properties); + logSearchConfig.init(component, properties, clusterName); return logSearchConfig; } catch (Exception e) { LOG.error("Could not initialize logsearch config.", e); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java index 7309382..28844d5 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java @@ -30,7 +30,7 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; public class LogSearchConfigClass1 implements LogSearchConfig { @Override - public void init(Component component, Map<String, String> properties) {} + public void init(Component component, Map<String, String> properties, String clusterName) {} @Override public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { @@ -44,8 +44,8 @@ public class LogSearchConfigClass1 implements LogSearchConfig { public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} @Override - public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) - throws Exception {} + public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, + String clusterName) throws Exception {} @Override public List<String> getServices(String clusterName) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java index f83eeef..5934fa6 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java @@ -30,7 +30,7 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; public class LogSearchConfigClass2 implements LogSearchConfig { @Override - public void init(Component component, Map<String, String> properties) {} + public void init(Component component, Map<String, String> properties, String clusterName) {} @Override public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { @@ -44,8 +44,8 @@ public class LogSearchConfigClass2 implements LogSearchConfig { public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {} @Override - public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) - throws Exception {} + public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, + String clusterName) throws Exception {} @Override public List<String> getServices(String clusterName) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java index 425694f..f990c5c 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java @@ -33,7 +33,7 @@ public class LogSearchConfigFactoryTest { @Test public void testDefaultConfig() throws Exception { LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER, - Collections.<String, String> emptyMap(), LogSearchConfigClass1.class); + Collections.<String, String> emptyMap(), null, LogSearchConfigClass1.class); Assert.assertSame(config.getClass(), LogSearchConfigClass1.class); } @@ -43,7 +43,7 @@ public class LogSearchConfigFactoryTest { Map<String, String> logsearchConfClassMap = new HashMap<>(); logsearchConfClassMap.put("logsearch.config.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigClass2"); LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER, - logsearchConfClassMap, LogSearchConfigClass1.class); + logsearchConfClassMap, null, LogSearchConfigClass1.class); Assert.assertSame(config.getClass(), LogSearchConfigClass2.class); } @@ -53,6 +53,6 @@ public class LogSearchConfigFactoryTest { Map<String, String> logsearchConfClassMap = new HashMap<>(); logsearchConfClassMap.put("logsearch.config.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass"); LogSearchConfigFactory.createLogSearchConfig(Component.SERVER, - logsearchConfClassMap, LogSearchConfigClass1.class); + logsearchConfClassMap, null, LogSearchConfigClass1.class); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index 827101c..6d36203 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -72,14 +72,6 @@ public class LogSearchConfigZK implements LogSearchConfig { private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; @LogSearchPropertyDescription( - name = "cluster.name", - description = "Cluster name for Log Feeder. (added into zk path of the shipper configs)", - examples = {"cl1"}, - sources = {"logfeeder.properties"} - ) - private static final String CLUSTER_NAME_PROPERTY = "cluster.name"; - - @LogSearchPropertyDescription( name = "logsearch.config.zk_connect_string", description = "ZooKeeper connection string.", examples = {"localhost1:2181,localhost2:2181/znode"}, @@ -111,7 +103,7 @@ public class LogSearchConfigZK implements LogSearchConfig { private Gson gson; @Override - public void init(Component component, Map<String, String> properties) throws Exception { + public void init(Component component, Map<String, String> properties, String clusterName) throws Exception { this.properties = properties; LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY)); @@ -136,8 +128,7 @@ public class LogSearchConfigZK implements LogSearchConfig { LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds"); Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000); } - - cache = new TreeCache(client, String.format("%s/%s", root, properties.get(CLUSTER_NAME_PROPERTY))); + cache = new TreeCache(client, String.format("%s/%s", root, clusterName)); } gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); @@ -169,7 +160,7 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor, - final LogLevelFilterMonitor logLevelFilterMonitor) throws Exception { + final LogLevelFilterMonitor logLevelFilterMonitor, final String clusterName) throws Exception { final JsonParser parser = new JsonParser(); final JsonArray globalConfigNode = new JsonArray(); for (String globalConfigJsonString : inputConfigMonitor.getGlobalConfigJsons()) { @@ -177,6 +168,8 @@ public class LogSearchConfigZK implements LogSearchConfig { globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global")); } + createGlobalConfigNode(globalConfigNode, clusterName); + TreeCacheListener listener = new TreeCacheListener() { private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED); @@ -189,7 +182,7 @@ public class LogSearchConfigZK implements LogSearchConfig { String nodeData = new String(event.getData().getData()); Type eventType = event.getType(); - String configPathStab = String.format("%s/%s/", root, properties.get(CLUSTER_NAME_PROPERTY)); + String configPathStab = String.format("%s/%s/", root, clusterName); if (event.getData().getPath().startsWith(configPathStab + "input/")) { handleInputConfigChange(eventType, nodeName, nodeData); @@ -271,12 +264,10 @@ public class LogSearchConfigZK implements LogSearchConfig { }; cache.getListenable().addListener(listener); cache.start(); - - createGlobalConfigNode(globalConfigNode); } - private void createGlobalConfigNode(JsonArray globalConfigNode) { - String globalConfigNodePath = String.format("%s/%s/global", root, properties.get(CLUSTER_NAME_PROPERTY)); + private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) { + String globalConfigNodePath = String.format("%s/%s/global", root, clusterName); String data = InputConfigGson.gson.toJson(globalConfigNode); try { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index e7b6edc..59c2a22 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -33,7 +33,6 @@ import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component; import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK; import org.apache.commons.io.FileUtils; import org.apache.ambari.logfeeder.input.InputConfigUploader; -import org.apache.ambari.logfeeder.input.InputManager; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.metrics.MetricsManager; @@ -57,11 +56,9 @@ public class LogFeeder { private ConfigHandler configHandler = new ConfigHandler(); private LogSearchConfig config; - private InputManager inputManager = new InputManager(); private MetricsManager metricsManager = new MetricsManager(); private long lastCheckPointCleanedMS = 0; - private boolean isLogfeederCompleted = false; private Thread statLoggerThread = null; private LogFeeder(LogFeederCommandLine cli) { @@ -72,7 +69,6 @@ public class LogFeeder { try { init(); monitor(); - waitOnAllDaemonThreads(); } catch (Throwable t) { LOG.fatal("Caught exception in main.", t); System.exit(1); @@ -85,11 +81,11 @@ public class LogFeeder { configHandler.init(); SSLUtil.ensureStorePasswords(); - config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, - Maps.fromProperties(LogFeederUtil.getProperties()), LogSearchConfigZK.class); + config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()), + LogFeederUtil.getClusterName(), LogSearchConfigZK.class); LogLevelFilterHandler.init(config); InputConfigUploader.load(config); - config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler()); + config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederUtil.getClusterName()); metricsManager.init(); @@ -117,8 +113,8 @@ public class LogFeeder { } private void monitor() throws Exception { - JVMShutdownHook logfeederJVMHook = new JVMShutdownHook(); - ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY); + JVMShutdownHook logFeederJVMHook = new JVMShutdownHook(); + ShutdownHookManager.get().addShutdownHook(logFeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY); statLoggerThread = new Thread("statLogger") { @@ -140,10 +136,6 @@ public class LogFeeder { lastCheckPointCleanedMS = System.currentTimeMillis(); configHandler.cleanCheckPointFiles(); } - - if (isLogfeederCompleted) { - break; - } } } @@ -163,20 +155,6 @@ public class LogFeeder { } } - private void waitOnAllDaemonThreads() { - if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) { - inputManager.waitOnAllInputs(); - isLogfeederCompleted = true; - if (statLoggerThread != null) { - try { - statLoggerThread.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - public void test() { try { LogManager.shutdown(); @@ -203,7 +181,7 @@ public class LogFeeder { if (cli.isMonitor()) { try { - LogFeederUtil.loadProperties("logfeeder.properties"); + LogFeederUtil.loadProperties(); } catch (Throwable t) { LOG.warn("Could not load logfeeder properites"); System.exit(1); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 25669d9..5bf074c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -41,15 +41,14 @@ import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.AliasUtil; -import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; @@ -62,9 +61,30 @@ import org.apache.log4j.Logger; import com.google.gson.reflect.TypeToken; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public class ConfigHandler implements InputConfigMonitor { private static final Logger LOG = Logger.getLogger(ConfigHandler.class); + @LogSearchPropertyDescription( + name = "logfeeder.config.files", + description = "Comma separated list of the config files containing global / output configurations.", + examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"}, + defaultValue = "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files"; + + private static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0; + @LogSearchPropertyDescription( + name = "logfeeder.simulate.input_number", + description = "The number of the simulator instances to run with. O means no simulation.", + examples = {"10"}, + defaultValue = DEFAULT_SIMULATE_INPUT_NUMBER + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number"; + private final OutputManager outputManager = new OutputManager(); private final InputManager inputManager = new InputManager(); @@ -108,24 +128,10 @@ public class ConfigHandler implements InputConfigMonitor { private List<String> getConfigFiles() { List<String> configFiles = new ArrayList<>(); - String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files"); - LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty); - if (logfeederConfigFilesProperty != null) { - configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(","))); - } - - String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir"); - if (StringUtils.isNotEmpty(inputConfigDir)) { - File configDirFile = new File(inputConfigDir); - List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false); - for (File inputConfigFile : inputConfigFiles) { - configFiles.add(inputConfigFile.getAbsolutePath()); - } - } - - if (CollectionUtils.isEmpty(configFiles)) { - String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json"); - configFiles.addAll(Arrays.asList(configFileProperty.split(","))); + String logFeederConfigFilesProperty = LogFeederUtil.getStringProperty(CONFIG_FILES_PROPERTY); + LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty); + if (logFeederConfigFilesProperty != null) { + configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(","))); } return configFiles; @@ -225,7 +231,7 @@ public class ConfigHandler implements InputConfigMonitor { } private void simulateIfNeeded() throws Exception { - int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0); + int simulatedInputNumber = LogFeederUtil.getIntProperty(SIMULATE_INPUT_NUMBER_PROPERTY, DEFAULT_SIMULATE_INPUT_NUMBER); if (simulatedInputNumber == 0) return; @@ -434,10 +440,6 @@ public class ConfigHandler implements InputConfigMonitor { outputManager.addMetricsContainers(metricsList); } - public void waitOnAllInputs() { - inputManager.waitOnAllInputs(); - } - public void close() { inputManager.close(); outputManager.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java new file mode 100644 index 0000000..3653475 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.common; + +public class LogFeederException extends Exception { + + public LogFeederException(String message, Throwable throwable) { + super(message, throwable); + } + + public LogFeederException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java deleted file mode 100644 index 8a07602..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.common; - -public class LogfeederException extends Exception { - - public LogfeederException(String message, Throwable throwable) { - super(message, throwable); - } - - public LogfeederException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index fd02497..8e8834b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.ambari.logfeeder.common.ConfigItem; -import org.apache.ambari.logfeeder.common.LogfeederException; +import org.apache.ambari.logfeeder.common.LogFeederException; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.mapper.Mapper; @@ -116,7 +116,7 @@ public abstract class Filter extends ConfigItem { /** * Deriving classes should implement this at the minimum */ - public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException { + public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException { // TODO: There is no transformation for string types. if (nextFilter != null) { nextFilter.apply(inputStr, inputMarker); @@ -125,7 +125,7 @@ public abstract class Filter extends ConfigItem { } } - public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException { + public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException { for (String fieldName : postFieldValueMappers.keySet()) { Object value = jsonObj.get(fieldName); if (value != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 50247e2..fc7a565 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -34,7 +34,7 @@ import java.util.regex.Pattern; import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.exception.GrokException; -import org.apache.ambari.logfeeder.common.LogfeederException; +import org.apache.ambari.logfeeder.common.LogFeederException; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -160,7 +160,7 @@ public class FilterGrok extends Filter { } @Override - public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException { + public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException { if (grokMessage == null) { return; } @@ -195,7 +195,7 @@ public class FilterGrok extends Filter { } @Override - public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException { + public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException { if (sourceField != null) { savedInputMarker = inputMarker; applyMessage((String) jsonObj.get(sourceField), jsonObj, null); @@ -205,7 +205,7 @@ public class FilterGrok extends Filter { } } - private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogfeederException { + private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogFeederException { String jsonStr = grokMessage.capture(inputStr); boolean parseError = false; @@ -260,7 +260,7 @@ public class FilterGrok extends Filter { Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>()); try { applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr); - } catch (LogfeederException e) { + } catch (LogFeederException e) { LOG.error(e.getLocalizedMessage(), e.getCause()); } strBuff = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java index cfccdeb..1a2da0c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java @@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.filter; import java.util.Map; import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.common.LogfeederException; +import org.apache.ambari.logfeeder.common.LogFeederException; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -29,13 +29,13 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil; public class FilterJSON extends Filter { @Override - public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException { + public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException { Map<String, Object> jsonMap = null; try { jsonMap = LogFeederUtil.toJSONObject(inputStr); } catch (Exception e) { LOG.error(e.getLocalizedMessage()); - throw new LogfeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause()); + throw new LogFeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause()); } Double lineNumberD = (Double) jsonMap.get("line_number"); if (lineNumberD != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java index f2a4186..670b1c3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; -import org.apache.ambari.logfeeder.common.LogfeederException; +import org.apache.ambari.logfeeder.common.LogFeederException; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -58,12 +58,12 @@ public class FilterKeyValue extends Filter { } @Override - public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException { + public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException { apply(LogFeederUtil.toJSONObject(inputStr), inputMarker); } @Override - public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException { + public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException { if (sourceField == null) { return; } http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index 2359256..9535260 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -68,7 +68,8 @@ public abstract class AbstractInputFile extends Input { public void init() throws Exception { LOG.info("init() called"); - checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", InputManager.DEFAULT_CHECKPOINT_EXTENSION); + checkPointExtension = LogFeederUtil.getStringProperty(InputManager.CHECKPOINT_EXTENSION_PROPERTY, + InputManager.DEFAULT_CHECKPOINT_EXTENSION); // Let's close the file and set it to true after we start monitoring it setClosed(true); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 49151e7..8050263 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -26,12 +26,13 @@ import java.util.Map; import org.apache.ambari.logfeeder.input.cache.LRUCache; import org.apache.ambari.logfeeder.common.ConfigItem; -import org.apache.ambari.logfeeder.common.LogfeederException; +import org.apache.ambari.logfeeder.common.LogFeederException; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions; import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; @@ -39,15 +40,62 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; import org.apache.commons.lang.BooleanUtils; import org.apache.log4j.Priority; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public abstract class Input extends ConfigItem implements Runnable { - private static final boolean DEFAULT_TAIL = true; - private static final boolean DEFAULT_USE_EVENT_MD5 = false; - private static final boolean DEFAULT_GEN_EVENT_MD5 = true; private static final boolean DEFAULT_CACHE_ENABLED = false; - private static final boolean DEFAULT_CACHE_DEDUP_LAST = false; + @LogSearchPropertyDescription( + name = "logfeeder.cache.enabled", + description = "Enables the usage of a cache to avoid duplications.", + examples = {"true"}, + defaultValue = DEFAULT_CACHE_ENABLED + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled"; + + private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; + @LogSearchPropertyDescription( + name = "logfeeder.cache.key.field", + description = "The field which's value should be cached and should be checked for repteitions.", + examples = {"some_field_prone_to_repeating_value"}, + defaultValue = DEFAULT_CACHE_KEY_FIELD, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field"; + private static final int DEFAULT_CACHE_SIZE = 100; + @LogSearchPropertyDescription( + name = "logfeeder.cache.size", + description = "The number of log entries to cache in order to avoid duplications.", + examples = {"50"}, + defaultValue = DEFAULT_CACHE_SIZE + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size"; + + private static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false; + @LogSearchPropertyDescription( + name = "logfeeder.cache.last.dedup.enabled", + description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.", + examples = {"true"}, + defaultValue = DEFAULT_CACHE_LAST_DEDUP_ENABLED + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled"; + private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; - private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; + @LogSearchPropertyDescription( + name = "logfeeder.cache.dedup.interval", + description = "Maximum number of milliseconds between two identical messages to be filtered out.", + examples = {"500"}, + defaultValue = DEFAULT_CACHE_DEDUP_INTERVAL + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval"; + + private static final boolean DEFAULT_TAIL = true; + private static final boolean DEFAULT_USE_EVENT_MD5 = false; + private static final boolean DEFAULT_GEN_EVENT_MD5 = true; protected InputDescriptor inputDescriptor; @@ -183,7 +231,7 @@ public abstract class Input extends ConfigItem implements Runnable { if (firstFilter != null) { try { firstFilter.apply(line, marker); - } catch (LogfeederException e) { + } catch (LogFeederException e) { LOG.error(e.getLocalizedMessage(), e); } } else { @@ -246,25 +294,25 @@ public abstract class Input extends ConfigItem implements Runnable { private void initCache() { boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null ? inputDescriptor.isCacheEnabled() - : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED); + : LogFeederUtil.getBooleanProperty(CACHE_ENABLED_PROPERTY, DEFAULT_CACHE_ENABLED); if (cacheEnabled) { String cacheKeyField = inputDescriptor.getCacheKeyField() != null ? inputDescriptor.getCacheKeyField() - : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD); + : LogFeederUtil.getStringProperty(CACHE_KEY_FIELD_PROPERTY, DEFAULT_CACHE_KEY_FIELD); setCacheKeyField(cacheKeyField); boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null ? inputDescriptor.getCacheLastDedupEnabled() - : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST); + : LogFeederUtil.getBooleanProperty(CACHE_LAST_DEDUP_ENABLED_PROPERTY, DEFAULT_CACHE_LAST_DEDUP_ENABLED); int cacheSize = inputDescriptor.getCacheSize() != null ? inputDescriptor.getCacheSize() - : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE); + : LogFeederUtil.getIntProperty(CACHE_SIZE_PROPERTY, DEFAULT_CACHE_SIZE); long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null ? inputDescriptor.getCacheDedupInterval() - : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL))); + : Long.parseLong(LogFeederUtil.getStringProperty(CACHE_DEDUP_INTERVAL_PROPERTY, String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL))); setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled)); } http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index 8aec690..09fc3f5 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -28,14 +28,25 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; import com.google.common.io.Files; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public class InputConfigUploader extends Thread { protected static final Logger LOG = Logger.getLogger(InputConfigUploader.class); + @LogSearchPropertyDescription( + name = "logfeeder.config.dir", + description = "The directory where shipper configuration files are looked for.", + examples = {"/etc/ambari-logsearch-logfeeder/conf"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir"; + private static final long SLEEP_BETWEEN_CHECK = 2000; private final File configDir; @@ -48,7 +59,6 @@ public class InputConfigUploader extends Thread { private final Set<String> filesHandled = new HashSet<>(); private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json"); private final LogSearchConfig config; - private final String clusterName = LogFeederUtil.getStringProperty("cluster.name"); public static void load(LogSearchConfig config) { new InputConfigUploader(config).start(); @@ -58,7 +68,7 @@ public class InputConfigUploader extends Thread { super("Input Config Loader"); setDaemon(true); - this.configDir = new File(LogFeederUtil.getStringProperty("logfeeder.config.dir")); + this.configDir = new File(LogFeederUtil.getStringProperty(CONFIG_DIR_PROPERTY)); this.config = config; } @@ -74,8 +84,8 @@ public class InputConfigUploader extends Thread { String serviceName = m.group(1); String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); - if (!config.inputConfigExists(clusterName, serviceName)) { - config.createInputConfig(clusterName, serviceName, inputConfig); + if (!config.inputConfigExists(LogFeederUtil.getClusterName(), serviceName)) { + config.createInputConfig(LogFeederUtil.getClusterName(), serviceName, inputConfig); } filesHandled.add(inputConfigFile.getAbsolutePath()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java index 01a11ec..091015a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -36,16 +36,36 @@ import java.util.UUID; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.solr.common.util.Base64; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public class InputManager { private static final Logger LOG = Logger.getLogger(InputManager.class); - private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints"; public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; + @LogSearchPropertyDescription( + name = "logfeeder.checkpoint.extension", + description = "The extension used for checkpoint files.", + examples = {"ckp"}, + defaultValue = DEFAULT_CHECKPOINT_EXTENSION, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension"; + + @LogSearchPropertyDescription( + name = "logfeeder.checkpoint.folder", + description = "The folder wher checkpoint files are stored.", + examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder"; + + private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints"; private Map<String, List<Input>> inputs = new HashMap<>(); private Set<Input> notReadyList = new HashSet<Input>(); @@ -118,32 +138,21 @@ public class InputManager { } private void initCheckPointSettings() { - checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", DEFAULT_CHECKPOINT_EXTENSION); + checkPointExtension = LogFeederUtil.getStringProperty(CHECKPOINT_EXTENSION_PROPERTY, DEFAULT_CHECKPOINT_EXTENSION); LOG.info("Determining valid checkpoint folder"); boolean isCheckPointFolderValid = false; // We need to keep track of the files we are reading. - String checkPointFolder = LogFeederUtil.getStringProperty("logfeeder.checkpoint.folder"); + String checkPointFolder = LogFeederUtil.getStringProperty(CHECKPOINT_FOLDER_PROPERTY); if (!StringUtils.isEmpty(checkPointFolder)) { checkPointFolderFile = new File(checkPointFolder); isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); } - if (!isCheckPointFolderValid) { - // Let's try home folder - String userHome = LogFeederUtil.getStringProperty("user.home"); - if (userHome != null) { - checkPointFolderFile = new File(userHome, CHECKPOINT_SUBFOLDER_NAME); - LOG.info("Checking if home folder can be used for checkpoints. Folder=" + checkPointFolderFile); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - } - } + if (!isCheckPointFolderValid) { // Let's use tmp folder - String tmpFolder = LogFeederUtil.getStringProperty("java.io.tmpdir"); - if (tmpFolder == null) { - tmpFolder = "/tmp"; - } + String tmpFolder = LogFeederUtil.getLogFeederTempDir(); checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME); - LOG.info("Checking if tmps folder can be used for checkpoints. Folder=" + checkPointFolderFile); + LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" + checkPointFolderFile); isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); if (isCheckPointFolderValid) { LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." + @@ -153,6 +162,8 @@ public class InputManager { if (isCheckPointFolderValid) { LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints"); + } else { + throw new IllegalStateException("Could not determine the checkpoint folder."); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index 5e7bdb3..f1002ae 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -34,6 +34,7 @@ import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.filter.FilterJSON; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl; @@ -42,9 +43,70 @@ import org.apache.solr.common.util.Base64; import com.google.common.base.Joiner; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public class InputSimulate extends Input { private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}"; + private static final String DEFAULT_LOG_LEVEL = "WARN"; + @LogSearchPropertyDescription( + name = "logfeeder.simulate.log_level", + description = "The log level to create the simulated log entries with.", + examples = {"INFO"}, + defaultValue = DEFAULT_LOG_LEVEL, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level"; + + private static final int DEFAULT_NUMBER_OF_WORDS = 1000; + @LogSearchPropertyDescription( + name = "logfeeder.simulate.number_of_words", + description = "The size of the set of words that may be used to create the simulated log entries with.", + examples = {"100"}, + defaultValue = DEFAULT_NUMBER_OF_WORDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words"; + + private static final int DEFAULT_MIN_LOG_WORDS = 5; + @LogSearchPropertyDescription( + name = "logfeeder.simulate.min_log_words", + description = "The minimum number of words in a simulated log entry.", + examples = {"3"}, + defaultValue = DEFAULT_MIN_LOG_WORDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words"; + + private static final int DEFAULT_MAX_LOG_WORDS = 5; + @LogSearchPropertyDescription( + name = "logfeeder.simulate.max_log_words", + description = "The maximum number of words in a simulated log entry.", + examples = {"8"}, + defaultValue = DEFAULT_MAX_LOG_WORDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words"; + + private static final int DEFAULT_SLEEP_MILLISECONDS = 10000; + @LogSearchPropertyDescription( + name = "logfeeder.simulate.sleep_milliseconds", + description = "The milliseconds to sleep between creating two simulated log entries.", + examples = {"5000"}, + defaultValue = DEFAULT_SLEEP_MILLISECONDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds"; + + @LogSearchPropertyDescription( + name = "logfeeder.simulate.log_ids", + description = "The comma separated list of log ids for which to create the simulated log entries.", + examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"}, + defaultValue = "The log ids of the installed services in the cluster", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids"; + private static final Map<String, String> typeToFilePath = new HashMap<>(); private static final List<String> inputTypes = new ArrayList<>(); public static void loadTypeToFilePath(List<InputDescriptor> inputList) { @@ -75,11 +137,11 @@ public class InputSimulate extends Input { public InputSimulate() throws Exception { this.types = getSimulatedLogTypes(); - this.level = LogFeederUtil.getStringProperty("logfeeder.simulate.log_level", "WARN"); - this.numberOfWords = LogFeederUtil.getIntProperty("logfeeder.simulate.number_of_words", 1000, 50, 1000000); - this.minLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.min_log_words", 5, 1, 10); - this.maxLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.max_log_words", 10, 10, 20); - this.sleepMillis = LogFeederUtil.getIntProperty("logfeeder.simulate.sleep_milliseconds", 10000); + this.level = LogFeederUtil.getStringProperty(LOG_LEVEL_PROPERTY, DEFAULT_LOG_LEVEL); + this.numberOfWords = LogFeederUtil.getIntProperty(NUMBER_OF_WORDS_PROPERTY, DEFAULT_NUMBER_OF_WORDS, 50, 1000000); + this.minLogWords = LogFeederUtil.getIntProperty(MIN_LOG_WORDS_PROPERTY, DEFAULT_MIN_LOG_WORDS, 1, 10); + this.maxLogWords = LogFeederUtil.getIntProperty(MAX_LOG_WORDS_PROPERTY, DEFAULT_MAX_LOG_WORDS, 10, 20); + this.sleepMillis = LogFeederUtil.getIntProperty(SLEEP_MILLISECONDS_PROPERTY, DEFAULT_SLEEP_MILLISECONDS); this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName; Filter filter = new FilterJSON(); @@ -89,7 +151,7 @@ public class InputSimulate extends Input { } private List<String> getSimulatedLogTypes() { - String logsToSimulate = LogFeederUtil.getStringProperty("logfeeder.simulate.log_ids"); + String logsToSimulate = LogFeederUtil.getStringProperty(LOG_IDS_PROPERTY); return (logsToSimulate == null) ? inputTypes : Arrays.asList(logsToSimulate.split(",")); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java index 8a4d953..79bf5ea 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java @@ -32,14 +32,35 @@ import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public class LogLevelFilterHandler implements LogLevelFilterMonitor { private static final Logger LOG = Logger.getLogger(LogLevelFilterHandler.class); + private static final boolean DEFAULT_LOG_FILTER_ENABLE = false; + @LogSearchPropertyDescription( + name = "logfeeder.log.filter.enable", + description = "Enables the filtering of the log entries by log level filters.", + examples = {"true"}, + defaultValue = DEFAULT_LOG_FILTER_ENABLE + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable"; + + @LogSearchPropertyDescription( + name = "logfeeder.include.default.level", + description = "Comma separtaed list of the default log levels to be enabled by the filtering.", + examples = {"FATAL,ERROR,WARN"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level"; + private static final String TIMEZONE = "GMT"; private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; @@ -52,15 +73,14 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { }; private static LogSearchConfig config; - private static String clusterName = LogFeederUtil.getStringProperty("cluster.name"); private static boolean filterEnabled; private static List<String> defaultLogLevels; private static Map<String, LogLevelFilter> filters = new HashMap<>(); public static void init(LogSearchConfig config_) { config = config_; - filterEnabled = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false); - defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty("logfeeder.include.default.level").split(",")); + filterEnabled = LogFeederUtil.getBooleanProperty(LOG_FILTER_ENABLE_PROPERTY, DEFAULT_LOG_FILTER_ENABLE); + defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty(INCLUDE_DEFAULT_LEVEL_PROPERTY).split(",")); TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE)); } @@ -100,7 +120,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { defaultFilter.setDefaultLevels(defaultLogLevels); try { - config.createLogLevelFilter(clusterName, logId, defaultFilter); + config.createLogLevelFilter(LogFeederUtil.getClusterName(), logId, defaultFilter); filters.put(logId, defaultFilter); } catch (Exception e) { LOG.warn("Could not persist the default filter for log " + logId, e); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java index 39526a5..fdad9a6 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.metrics; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.SSLUtil; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; @@ -28,6 +29,8 @@ import org.apache.log4j.Logger; import com.google.common.base.Splitter; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + import java.util.Collection; import java.util.List; @@ -35,21 +38,53 @@ import java.util.List; public class LogFeederAMSClient extends AbstractTimelineMetricsSink { private static final Logger LOG = Logger.getLogger(LogFeederAMSClient.class); + @LogSearchPropertyDescription( + name = "logfeeder.metrics.collector.hosts", + description = "Comma separtaed list of metric collector hosts.", + examples = {"c6401.ambari.apache.org"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts"; + + @LogSearchPropertyDescription( + name = "logfeeder.metrics.collector.protocol", + description = "The protocol used by metric collectors.", + examples = {"http", "https"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol"; + + @LogSearchPropertyDescription( + name = "logfeeder.metrics.collector.port", + description = "The port used by metric collectors.", + examples = {"6188"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port"; + + @LogSearchPropertyDescription( + name = "logfeeder.metrics.collector.path", + description = "The path used by metric collectors.", + examples = {"/ws/v1/timeline/metrics"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path"; + private final List<String> collectorHosts; private final String collectorProtocol; private final String collectorPort; private final String collectorPath; public LogFeederAMSClient() { - String collectorHostsString = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.hosts"); + String collectorHostsString = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_HOSTS_PROPERTY); if (!StringUtils.isBlank(collectorHostsString)) { collectorHostsString = collectorHostsString.trim(); LOG.info("AMS collector Hosts=" + collectorHostsString); collectorHosts = Splitter.on(",").splitToList(collectorHostsString); - collectorProtocol = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.protocol"); - collectorPort = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.port"); - collectorPath = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.path"); + collectorProtocol = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PROTOCOL_PROPERTY); + collectorPort = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PORT_PROPERTY); + collectorPath = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PATH_PROPERTY); } else { collectorHosts = null; collectorProtocol = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java index 942c0b4..1094852 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java @@ -19,7 +19,6 @@ package org.apache.ambari.logfeeder.metrics; -import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -34,7 +33,6 @@ public class MetricsManager { private static final Logger LOG = Logger.getLogger(MetricsManager.class); private boolean isMetricsEnabled = false; - private String nodeHostName = null; private String appId = "logfeeder"; private long lastPublishTimeMS = 0; // Let's do the first publish immediately @@ -50,8 +48,7 @@ public class MetricsManager { amsClient = new LogFeederAMSClient(); if (amsClient.getCollectorUri(null) != null) { - findNodeHostName(); - if (nodeHostName == null) { + if (LogFeederUtil.hostName == null) { isMetricsEnabled = false; LOG.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics"); } else { @@ -63,24 +60,6 @@ public class MetricsManager { } } - private void findNodeHostName() { - nodeHostName = LogFeederUtil.getStringProperty("node.hostname"); - if (nodeHostName == null) { - try { - nodeHostName = InetAddress.getLocalHost().getHostName(); - } catch (Throwable e) { - LOG.warn("Error getting hostname using InetAddress.getLocalHost().getHostName()", e); - } - } - if (nodeHostName == null) { - try { - nodeHostName = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (Throwable e) { - LOG.warn("Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", e); - } - } - } - public boolean isMetricsEnabled() { return isMetricsEnabled; } @@ -117,7 +96,7 @@ public class MetricsManager { LOG.debug("Creating new metric obbject for " + metric.metricsName); timelineMetric = new TimelineMetric(); timelineMetric.setMetricName(metric.metricsName); - timelineMetric.setHostName(nodeHostName); + timelineMetric.setHostName(LogFeederUtil.hostName); timelineMetric.setAppId(appId); timelineMetric.setStartTime(currMS); timelineMetric.setType("Long"); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index 8f4b0b1..2b47a00 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -25,7 +25,7 @@ import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil; +import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -87,7 +87,7 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC HashMap<String, String> contextParam = buildContextParam(); hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam); LOG.info("hdfs Output dir=" + hdfsOutDir); - String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/"; + String localFileDir = LogFeederUtil.getLogFeederTempDir() + "hdfs/service/"; logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this); this.startHDFSCopyThread(); } @@ -124,13 +124,13 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC Iterator<File> localFileIterator = localReadyFiles.iterator(); while (localFileIterator.hasNext()) { File localFile = localFileIterator.next(); - fileSystem = LogfeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort); + fileSystem = LogFeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort); if (fileSystem != null && localFile.exists()) { String destFilePath = hdfsOutDir + "/" + localFile.getName(); String localPath = localFile.getAbsolutePath(); boolean overWrite = true; boolean delSrc = true; - boolean isCopied = LogfeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem, + boolean isCopied = LogFeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem, overWrite, delSrc); if (isCopied) { LOG.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath); @@ -179,7 +179,7 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC LOG.error(" Current thread : '" + Thread.currentThread().getName() + "' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'"); } - LogfeederHDFSUtil.closeFileSystem(fileSystem); + LogFeederHDFSUtil.closeFileSystem(fileSystem); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index 076d12d..9f41a15 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -205,7 +205,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH @VisibleForTesting protected LogSpooler createSpooler(String filePath) { - String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service"; + String spoolDirectory = LogFeederUtil.getLogFeederTempDir() + "/s3/service"; LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath)); return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this, s3OutputConfiguration.getRolloverTimeThresholdSecs()); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index d37a3bb..162a7f8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -51,9 +52,32 @@ import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; + public class OutputSolr extends Output { + + private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab"; + @LogSearchPropertyDescription( + name = "logfeeder.solr.jaas.file", + description = "The jaas file used for solr.", + examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"}, + defaultValue = DEFAULT_SOLR_JAAS_FILE, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file"; + private static final Logger LOG = Logger.getLogger(OutputSolr.class); + private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false; + @LogSearchPropertyDescription( + name = "logfeeder.solr.kerberos.enable", + description = "Enables using kerberos for accessing solr.", + examples = {"true"}, + defaultValue = DEFAULT_SOLR_KERBEROS_ENABLE + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + private static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable"; + private static final int DEFAULT_MAX_BUFFER_SIZE = 5000; private static final int DEFAULT_MAX_INTERVAL_MS = 3000; private static final int DEFAULT_NUMBER_OF_SHARDS = 1; @@ -127,8 +151,8 @@ public class OutputSolr extends Output { private void setupSecurity() { - String jaasFile = LogFeederUtil.getStringProperty("logfeeder.solr.jaas.file", "/etc/security/keytabs/logsearch_solr.service.keytab"); - boolean securityEnabled = LogFeederUtil.getBooleanProperty("logfeeder.solr.kerberos.enable", false); + String jaasFile = LogFeederUtil.getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE); + boolean securityEnabled = LogFeederUtil.getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE); if (securityEnabled) { System.setProperty("java.security.auth.login.config", jaasFile); HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java index 90d1df6..8ade992 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java @@ -27,11 +27,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; -import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -42,18 +39,6 @@ public class FileUtil { private FileUtil() { throw new UnsupportedOperationException(); } - - public static List<File> getAllFileFromDir(File directory, String extension, boolean checkInSubDir) { - if (!directory.exists()) { - LOG.error(directory.getAbsolutePath() + " is not exists "); - } else if (!directory.isDirectory()) { - LOG.error(directory.getAbsolutePath() + " is not Directory "); - } else { - return (List<File>) FileUtils.listFiles(directory, new String[]{extension}, checkInSubDir); - } - return new ArrayList<File>(); - } - public static Object getFileKey(File file) { try { http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java new file mode 100644 index 0000000..4248ae1 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +public class LogFeederHDFSUtil { + private static final Logger LOG = Logger.getLogger(LogFeederHDFSUtil.class); + + private LogFeederHDFSUtil() { + throw new UnsupportedOperationException(); + } + + public static boolean copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite, + boolean delSrc) { + Path src = new Path(sourceFilepath); + Path dst = new Path(destFilePath); + boolean isCopied = false; + try { + LOG.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath); + fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst); + isCopied = true; + } catch (Exception e) { + LOG.error("Error copying local file :" + sourceFilepath + " to hdfs location : " + destFilePath, e); + } + return isCopied; + } + + public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort) { + try { + Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort); + FileSystem fs = FileSystem.get(configuration); + return fs; + } catch (Exception e) { + LOG.error("Exception is buildFileSystem :", e); + } + return null; + } + + private static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) { + String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/"; + Configuration configuration = new Configuration(); + configuration.set("fs.default.name", url); + return configuration; + } + + public static void closeFileSystem(FileSystem fileSystem) { + if (fileSystem != null) { + try { + fileSystem.close(); + } catch (IOException e) { + LOG.error(e.getLocalizedMessage(), e.getCause()); + } + } + } +} \ No newline at end of file