This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
The following commit(s) were added to refs/heads/master by this push: new 72b3dd6 AMBARI-24833. Support for cloud logs to using filters + JSON output (#26) 72b3dd6 is described below commit 72b3dd6d39d233db5b02536b3c6a3215971f6cd6 Author: Olivér Szabó <oleew...@gmail.com> AuthorDate: Mon Nov 19 10:32:15 2018 +0100 AMBARI-24833. Support for cloud logs to using filters + JSON output (#26) * AMBARI-24833. Support for cloud logs to using filters + JSON output * AMBARI-24833. Do not filter anything if filters are not enabled * AMBARI-24833. Fix intermittent issues. * AMBARI-24833. Edit comment --- .../local/LogSearchConfigLogFeederLocal.java | 42 ++++++-- .../config/zookeeper/LogLevelFilterManagerZK.java | 1 + .../logfeeder/common/LogFeederConstants.java | 1 + .../ambari/logfeeder/conf/LogFeederProps.java | 18 ++++ ...andler.java => AbstractInputConfigHandler.java} | 84 ++-------------- .../impl/CloudStorageInputConfigHandler.java | 14 ++- .../operations/impl/DefaultInputConfigHandler.java | 62 +----------- .../logfeeder/output/OutputLineEnricher.java | 109 +++++++++++++++++++++ .../ambari/logfeeder/output/OutputManagerImpl.java | 76 ++------------ .../output/cloud/CloudStorageLoggerFactory.java | 14 ++- .../output/cloud/CloudStorageOutputManager.java | 27 ++++- .../output/cloud/CloudStorageUploader.java | 2 +- .../src/main/resources/logfeeder.properties | 1 + 13 files changed, 232 insertions(+), 219 deletions(-) diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java index f6cb519..12af637 100644 --- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java +++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java @@ -84,12 +84,7 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen File[] inputConfigFiles = new File(configDir).listFiles(inputConfigFileFilter); if (inputConfigFiles != null) { for (File inputConfigFile : inputConfigFiles) { - String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath())); - Matcher m = serviceNamePattern.matcher(inputConfigFile.getName()); - m.find(); - String serviceName = m.group(1); - JsonElement inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode); - inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class)); + tryLoadingInputConfig(inputConfigMonitor, parser, globalConfigNode, inputConfigFile); } } final FileSystem fs = FileSystems.getDefault(); @@ -100,6 +95,41 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen executorService.submit(updater); } + private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, JsonParser parser, JsonArray globalConfigNode, File inputConfigFile) throws Exception { + // note: that will try to solve a intermittent issue when the input config json is a null string (during file generation), that process will re-try to process the files a few times + int tries = 0; + while(true) { + tries++; + Matcher m = serviceNamePattern.matcher(inputConfigFile.getName()); + m.find(); + String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath())); + String serviceName = m.group(1); + JsonElement inputConfigJson = null; + logger.info("Trying to load '{}' service input config from input file '{}'", serviceName, inputConfigFile.getAbsolutePath()); + try { + inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode); + } catch (Exception e) { + final String errorMessage; + if (tries < 3) { + errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries)); + logger.error(errorMessage, e); + try { + Thread.sleep(2000); + } catch (Exception ex) { + // skip + } + continue; + } else { + errorMessage = String.format("Cannot parse input config: %s, after %s tries. Will skip to processing it", inputConfig, String.valueOf(tries)); + logger.error(errorMessage, e); + break; + } + } + inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class)); + break; + } + } + @Override public void close() throws IOException { } diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java index fd08e07..0975c39 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java @@ -48,6 +48,7 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager { public LogLevelFilterManagerZK(Map<String, String> properties) throws Exception { this.client = LogSearchConfigZKHelper.createZKClient(properties); + this.client.start(); this.serverCache = new TreeCache(client, "/"); this.aclList = LogSearchConfigZKHelper.getAcls(properties); this.gson = LogSearchConfigZKHelper.createGson(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index b5fffa8..f9ef32d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -112,6 +112,7 @@ public class LogFeederConstants { public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket"; public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap"; public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client"; + public static final String CLOUD_STORAGE_USE_FILTERS = "logfeeder.cloud.storage.use.filters"; public static final String CLOUD_STORAGE_CUSTOM_FS = "logfeeder.cloud.storage.custom.fs"; public static final String CLOUD_STORAGE_BASE_PATH = "logfeeder.cloud.storage.base.path"; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index 83f10e4..f2eb6c7 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -289,6 +289,16 @@ public class LogFeederProps implements LogFeederProperties { @Value("${"+ LogFeederConstants.HDFS_USER + ":}") private String logfeederHdfsUser; + @LogSearchPropertyDescription( + name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS, + description = "Use filters for inputs (with filters the output format will be JSON)", + examples = {"true"}, + defaultValue = "false", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_FILTERS + ":false}") + private boolean cloudStorageUseFilters; + @Inject private LogEntryCacheConfig logEntryCacheConfig; @@ -522,6 +532,14 @@ public class LogFeederProps implements LogFeederProperties { this.customFs = customFs; } + public boolean isCloudStorageUseFilters() { + return cloudStorageUseFilters; + } + + public void setCloudStorageUseFilters(boolean cloudStorageUseFilters) { + this.cloudStorageUseFilters = cloudStorageUseFilters; + } + public String getCloudBasePath() { return cloudBasePath; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java similarity index 53% copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java index 44da631..31bfd0d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,93 +18,29 @@ */ package org.apache.ambari.logfeeder.manager.operations.impl; -import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; -import org.apache.ambari.logfeeder.input.InputSimulate; import org.apache.ambari.logfeeder.manager.InputConfigHolder; +import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; import org.apache.ambari.logfeeder.plugin.common.AliasUtil; import org.apache.ambari.logfeeder.plugin.filter.Filter; import org.apache.ambari.logfeeder.plugin.input.Input; -import org.apache.ambari.logfeeder.plugin.output.Output; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; -import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; -import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** - * Holds input/filter/output operations in default Log Feeder mode. + * Holds common operations for input config handlers */ -public class DefaultInputConfigHandler implements InputConfigHandler { - - private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class); - - @Override - public void init(InputConfigHolder inputConfigHolder) throws Exception { - } - - @Override - public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) { - loadInputs(serviceName, inputConfigHolder); - loadFilters(serviceName, inputConfigHolder); - } - - @Override - public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) { - for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { - for (Output output : inputConfigHolder.getOutputManager().getOutputs()) { - if (input.isOutputRequired(output)) { - input.addOutput(output); - } - } - } +public abstract class AbstractInputConfigHandler implements InputConfigHandler { - // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager - for (Output output : InputSimulate.getSimulateOutputs()) { - output.setLogSearchConfig(inputConfigHolder.getConfig()); - inputConfigHolder.getOutputManager().add(output); - } - } - - private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) { - for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { - if (inputDescriptor == null) { - logger.warn("Input descriptor is smpty. Skipping..."); - continue; - } - - String source = inputDescriptor.getSource(); - if (StringUtils.isEmpty(source)) { - logger.error("Input block doesn't have source element"); - continue; - } - Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT); - if (input == null) { - logger.error("Input object could not be found"); - continue; - } - input.setType(source); - input.setLogType(inputDescriptor.getType()); - input.loadConfig(inputDescriptor); - - if (input.isEnabled()) { - input.setOutputManager(inputConfigHolder.getOutputManager()); - input.setInputManager(inputConfigHolder.getInputManager()); - inputConfigHolder.getInputManager().add(serviceName, input); - logger.info("New input object registered for service '{}': '{}'", serviceName, input.getLogType()); - input.logConfigs(); - } else { - logger.info("Input is disabled. So ignoring it. " + input.getShortDescription()); - } - } - } + private static final Logger logger = LogManager.getLogger(AbstractInputConfigHandler.class); - private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) { + protected void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) { sortFilters(inputConfigHolder); List<Input> toRemoveInputList = new ArrayList<>(); @@ -152,7 +88,7 @@ public class DefaultInputConfigHandler implements InputConfigHandler { } } - private void sortFilters(InputConfigHolder inputConfigHolder) { + protected void sortFilters(InputConfigHolder inputConfigHolder) { Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> { Integer o1Sort = o1.getSortOrder(); Integer o2Sort = o2.getSortOrder(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java index deb3a91..ac10b2d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java @@ -38,7 +38,7 @@ import java.util.List; /** * Holds input/filter/output operations in cloud Log Feeder mode. */ -public class CloudStorageInputConfigHandler implements InputConfigHandler { +public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler { private static final Logger logger = LogManager.getLogger(CloudStorageInputConfigHandler.class); @@ -49,6 +49,7 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler { @Override public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) { + final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters(); for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { if (inputDescriptor == null) { logger.warn("Input descriptor is smpty. Skipping..."); @@ -72,9 +73,11 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler { input.setType(source); input.setLogType(LogFeederConstants.CLOUD_PREFIX + inputDescriptor.getType()); input.loadConfig(inputDescriptor); - FilterDummy filter = new FilterDummy(); - filter.setOutputManager(inputConfigHolder.getOutputManager()); - input.setFirstFilter(filter); + if (!useFilters) { + FilterDummy filter = new FilterDummy(); + filter.setOutputManager(inputConfigHolder.getOutputManager()); + input.setFirstFilter(filter); + } input.setCloudInput(true); if (input.isEnabled()) { @@ -87,6 +90,9 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler { logger.info("Input is disabled. So ignoring it. " + input.getShortDescription()); } } + if (useFilters) { + loadFilters(serviceName, inputConfigHolder); + } } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java index 44da631..dd0fe3e 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java @@ -40,7 +40,7 @@ import java.util.List; /** * Holds input/filter/output operations in default Log Feeder mode. */ -public class DefaultInputConfigHandler implements InputConfigHandler { +public class DefaultInputConfigHandler extends AbstractInputConfigHandler { private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class); @@ -103,64 +103,4 @@ public class DefaultInputConfigHandler implements InputConfigHandler { } } } - - private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) { - sortFilters(inputConfigHolder); - - List<Input> toRemoveInputList = new ArrayList<>(); - for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { - for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) { - if (filterDescriptor == null) { - logger.warn("Filter descriptor is smpty. Skipping..."); - continue; - } - if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { - logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled"); - continue; - } - if (!input.isFilterRequired(filterDescriptor)) { - logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription()); - continue; - } - - String value = filterDescriptor.getFilter(); - if (StringUtils.isEmpty(value)) { - logger.error("Filter block doesn't have filter element"); - continue; - } - Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER); - if (filter == null) { - logger.error("Filter object could not be found"); - continue; - } - filter.loadConfig(filterDescriptor); - filter.setInput(input); - - filter.setOutputManager(inputConfigHolder.getOutputManager()); - input.addFilter(filter); - filter.logConfigs(); - } - - if (input.getFirstFilter() == null) { - toRemoveInputList.add(input); - } - } - - for (Input toRemoveInput : toRemoveInputList) { - logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription()); - inputConfigHolder.getInputManager().removeInput(toRemoveInput); - } - } - - private void sortFilters(InputConfigHolder inputConfigHolder) { - Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> { - Integer o1Sort = o1.getSortOrder(); - Integer o2Sort = o2.getSortOrder(); - if (o1Sort == null || o2Sort == null) { - return 0; - } - - return o1Sort - o2Sort; - }); - } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java new file mode 100644 index 0000000..bd9e3df --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output; + +import com.google.common.hash.Hashing; +import org.apache.ambari.logfeeder.plugin.common.MetricData; +import org.apache.ambari.logfeeder.plugin.input.Input; +import org.apache.ambari.logfeeder.plugin.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Utility class for fill output with other fields + */ +public class OutputLineEnricher { + + private static final Logger logger = LogManager.getLogger(OutputLineEnricher.class); + + private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1 + + public void enrichFields(final Map<String, Object> jsonObj, final InputMarker inputMarker, final MetricData messageTruncateMetric) { + Input input = inputMarker.getInput(); + // Update the block with the context fields + for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) { + if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) { + jsonObj.put(entry.getKey(), entry.getValue()); + } + } + // TODO: Ideally most of the overrides should be configurable + LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true); + if (input.isUseEventMD5() || input.isGenEventMD5()) { + String prefix = ""; + Object logtimeObj = jsonObj.get("logtime"); + if (logtimeObj != null) { + if (logtimeObj instanceof Date) { + prefix = "" + ((Date) logtimeObj).getTime(); + } else { + prefix = logtimeObj.toString(); + } + } + byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes(); + long eventMD5 = Hashing.md5().hashBytes(bytes).asLong(); + if (input.isGenEventMD5()) { + jsonObj.put("event_md5", prefix + Long.toString(eventMD5)); + } + if (input.isUseEventMD5()) { + jsonObj.put("id", prefix + Long.toString(eventMD5)); + } + } + jsonObj.computeIfAbsent("event_count", k -> 1); + if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) { + jsonObj.put("group", input.getInputDescriptor().getGroup()); + } + if (inputMarker.getAllProperties().containsKey("line_number") && + (Integer) inputMarker.getAllProperties().get("line_number") > 0) { + jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number")); + } + if (jsonObj.containsKey("log_message")) { + // TODO: Let's check size only for log_message for now + String logMessage = (String) jsonObj.get("log_message"); + logMessage = truncateLongLogMessage(messageTruncateMetric, jsonObj, input, logMessage); + jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong()); + } + } + + @SuppressWarnings("unchecked") + private String truncateLongLogMessage(MetricData messageTruncateMetric, Map<String, Object> jsonObj, Input input, String logMessage) { + if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { + messageTruncateMetric.value++; + String logMessageKey = input.getOutputManager().getClass().getSimpleName() + "_MESSAGESIZE"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length + + ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 200 characters=" + + StringUtils.abbreviate(logMessage, 200), null, logger, Level.WARN); + logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE); + jsonObj.put("log_message", logMessage); + List<String> tagsList = (List<String>) jsonObj.get("tags"); + if (tagsList == null) { + tagsList = new ArrayList<>(); + jsonObj.put("tags", tagsList); + } + tagsList.add("error_message_truncated"); + } + return logMessage; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java index afe1c0a..b4c862d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java @@ -59,7 +59,8 @@ public class OutputManagerImpl extends OutputManager { @Inject private LogFeederProps logFeederProps; - private OutputLineFilter outputLineFilter = new OutputLineFilter(); + private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher(); + private final OutputLineFilter outputLineFilter = new OutputLineFilter(); public List<Output> getOutputs() { return outputs; @@ -80,57 +81,12 @@ public class OutputManagerImpl extends OutputManager { @SuppressWarnings("unchecked") public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { - Input input = inputMarker.getInput(); - - // Update the block with the context fields - for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) { - if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) { - jsonObj.put(entry.getKey(), entry.getValue()); - } - } - - // TODO: Ideally most of the overrides should be configurable - - LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true); - jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN); - - if (input.isUseEventMD5() || input.isGenEventMD5()) { - String prefix = ""; - Object logtimeObj = jsonObj.get("logtime"); - if (logtimeObj != null) { - if (logtimeObj instanceof Date) { - prefix = "" + ((Date) logtimeObj).getTime(); - } else { - prefix = logtimeObj.toString(); - } - } - - - byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes(); - long eventMD5 = Hashing.md5().hashBytes(bytes).asLong(); - if (input.isGenEventMD5()) { - jsonObj.put("event_md5", prefix + Long.toString(eventMD5)); - } - if (input.isUseEventMD5()) { - jsonObj.put("id", prefix + Long.toString(eventMD5)); - } - } - jsonObj.put("seq_num", docCounter++); - jsonObj.computeIfAbsent("event_count", k -> 1); - if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) { - jsonObj.put("group", input.getInputDescriptor().getGroup()); - } - if (inputMarker.getAllProperties().containsKey("line_number") && - (Integer) inputMarker.getAllProperties().get("line_number") > 0) { - jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number")); - } - if (jsonObj.containsKey("log_message")) { - // TODO: Let's check size only for log_message for now - String logMessage = (String) jsonObj.get("log_message"); - logMessage = truncateLongLogMessage(jsonObj, input, logMessage); - jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong()); + if (docCounter == Long.MIN_VALUE) { + docCounter = 1; } + outputLineEnricher.enrichFields(jsonObj, inputMarker, messageTruncateMetric); + Input input = inputMarker.getInput(); List<String> defaultLogLevels = getDefaultLogLevels(input); if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels) && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) { @@ -159,26 +115,6 @@ public class OutputManagerImpl extends OutputManager { } @SuppressWarnings("unchecked") - private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) { - if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { - messageTruncateMetric.value++; - String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length + - ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" + - StringUtils.abbreviate(logMessage, 100), null, logger, Level.WARN); - logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE); - jsonObj.put("log_message", logMessage); - List<String> tagsList = (List<String>) jsonObj.get("tags"); - if (tagsList == null) { - tagsList = new ArrayList<String>(); - jsonObj.put("tags", tagsList); - } - tagsList.add("error_message_truncated"); - } - return logMessage; - } - - @SuppressWarnings("unchecked") public void write(String jsonBlock, InputMarker inputMarker) { List<String> defaultLogLevels = getDefaultLogLevels(inputMarker.getInput()); if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker, defaultLogLevels)) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java index 8201051..0cfdbcc 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java @@ -48,8 +48,11 @@ public class CloudStorageLoggerFactory { private static final String ARCHIVED_FOLDER = "archived"; private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz"; private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log"; + private static final String JSON_DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json.gz"; + private static final String JSON_DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json"; public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) { + boolean useJsonFormat = logFeederProps.isCloudStorageUseFilters(); String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, ""); String uniqueThreadName = input.getThread().getName(); Configuration config = loggerContext.getConfiguration(); @@ -59,8 +62,15 @@ public class CloudStorageLoggerFactory { String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath(); boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip(); - String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX; - String fileName = String.join(File.separator, activeLogDir, type + ".log"); + final String archiveFilePattern; + if (useJsonFormat) { + archiveFilePattern = useGzip ? JSON_DATE_PATTERN_SUFFIX_GZ : JSON_DATE_PATTERN_SUFFIX; + } else { + archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX; + } + + String logSuffix = useJsonFormat ? ".json" : ".log"; + String fileName = String.join(File.separator, activeLogDir, type + logSuffix); String filePattern = String.join(File.separator, archiveLogDir, type + archiveFilePattern); PatternLayout layout = PatternLayout.newBuilder() .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java index 16b7e55..9be30a0 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java @@ -18,7 +18,10 @@ */ package org.apache.ambari.logfeeder.output.cloud; +import org.apache.ambari.logfeeder.common.IdGeneratorHelper; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.output.OutputLineEnricher; +import org.apache.ambari.logfeeder.output.OutputLineFilter; import org.apache.ambari.logfeeder.plugin.common.MetricData; import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.plugin.input.InputMarker; @@ -33,6 +36,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * Handle output operations for sending cloud inputs to a cloud storage destination @@ -47,10 +51,25 @@ public class CloudStorageOutputManager extends OutputManager { private CloudStorageOutput storageOutput = null; private List<Output> outputList = new ArrayList<>(); + private final AtomicBoolean useFilters = new AtomicBoolean(false); + + private final MetricData messageTruncateMetric = new MetricData(null, false); + private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher(); + private final OutputLineFilter outputLineFilter = new OutputLineFilter(); @Override public void write(Map<String, Object> jsonObj, InputMarker marker) { - write(LogFeederUtil.getGson().toJson(jsonObj), marker); + if (useFilters.get()) { + outputLineEnricher.enrichFields(jsonObj, marker, messageTruncateMetric); + if (!outputLineFilter.apply(jsonObj, marker.getInput())) { + if (jsonObj.get("id") == null) { + jsonObj.put("id", IdGeneratorHelper.generateUUID(jsonObj, storageOutput.getIdFields())); + } + write(LogFeederUtil.getGson().toJson(jsonObj), marker); + } + } else { + write(LogFeederUtil.getGson().toJson(jsonObj), marker); + } } @Override @@ -82,6 +101,12 @@ public class CloudStorageOutputManager extends OutputManager { storageOutput = new CloudStorageOutput(logFeederProps); storageOutput.init(logFeederProps); add(storageOutput); + useFilters.set(logFeederProps.isCloudStorageUseFilters()); + if (useFilters.get()) { + logger.info("Using filters are enabled for cloud log outputs"); + } else { + logger.info("Using filters are disabled for cloud log outputs"); + } } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java index b76f441..af9326a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java @@ -76,7 +76,7 @@ public class CloudStorageUploader extends Thread { try { final String archiveLogDir = String.join(File.separator, logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, "archived"); if (new File(archiveLogDir).exists()) { - String[] extensions = {"log", "gz"}; + String[] extensions = {"log", "json", "gz"}; Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true); if (filesToUpload.isEmpty()) { logger.debug("Not found any files to upload."); diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index c7ea335..45c05f3 100644 --- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -47,6 +47,7 @@ logfeeder.cloud.storage.uploader.interval.seconds=1 logfeeder.cloud.storage.upload.on.shutdown=true logfeeder.cloud.storage.base.path=/apps/logfeeder logfeeder.cloud.storage.use.hdfs.client=true +logfeeder.cloud.storage.use.filters=false logfeeder.cloud.storage.bucket=logfeeder logfeeder.cloud.storage.bucket.bootstrap=true