[ https://issues.apache.org/jira/browse/AMBARI-24833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689276#comment-16689276 ]
ASF GitHub Bot commented on AMBARI-24833: ----------------------------------------- oleewere closed pull request #25: AMBARI-24833. Extend cloud log rolling options URL: https://github.com/apache/ambari-logsearch/pull/25 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch-logfeeder/pom.xml index e71b3cc555..003b23357f 100644 --- a/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch-logfeeder/pom.xml @@ -150,6 +150,12 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-bundle</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.amazonaws</groupId> @@ -161,6 +167,11 @@ <artifactId>aws-java-sdk-s3</artifactId> <version>${aws-sdk.version}</version> </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-dynamodb</artifactId> + <version>${aws-sdk.version}</version> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-azure</artifactId> diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index 11d351f23d..b5fffa829b 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -34,10 +34,6 @@ // Event History Constants History public static final String VALUES = "jsons"; public static final String ROW_TYPE = "rowtype"; - - // S3 Constants - public static final String S3_PATH_START_WITH = "s3://"; - public static final String S3_PATH_SEPARATOR = "/"; public static final String IN_MEMORY_TIMESTAMP = "in_memory_timestamp"; @@ -122,14 +118,17 @@ public static final String CLOUD_ROLLOVER_ARCHIVE_LOCATION = "logfeeder.cloud.rollover.archive.base.dir"; public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_MIN = "logfeeder.cloud.rollover.threshold.min"; public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE = "logfeeder.cloud.rollover.threshold.size"; + public static final String CLOUD_ROLLOVER_MAX_BACKUP_FILES = "logfeeder.cloud.rollover.max.files"; + public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE_UNIT = "logfeeder.cloud.rollover.threshold.size.unit"; public static final String CLOUD_ROLLOVER_USE_GZIP = "logfeeder.cloud.rollover.use.gzip"; public static final String CLOUD_ROLLOVER_IMMEDIATE_FLUSH = "logfeeder.cloud.rollover.immediate.flush"; public static final String CLOUD_ROLLOVER_ON_SHUTDOWN = "logfeeder.cloud.rollover.on.shutdown"; public static final String CLOUD_ROLLOVER_ON_STARTUP = "logfeeder.cloud.rollover.on.startup"; + public static final String HDFS_USER = "logfeeder.hdfs.user"; + public static final String HDFS_HOST = "logfeeder.hdfs.host"; public static final String HDFS_PORT = "logfeeder.hdfs.port"; - public static final String HDFS_USER = "logfeeder.hdfs.user"; public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions"; public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos"; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java index 1a7eafa509..0246b98306 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java @@ -19,7 +19,7 @@ package org.apache.ambari.logfeeder.conf; public enum CloudStorageDestination { - HDFS("hdfs"), S3("s3"), GCS("gcs"), ADLS("adls"), NONE("none"); + HDFS("hdfs"), S3("s3"), GCS("gcs"), ADLS("adls"), WASB("wasb"), DEFAULT_FS("default-fs"), NONE("none"); private String text; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index d32e1df9db..83f10e497c 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -224,7 +224,8 @@ @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_STORAGE_DESTINATION, description = "Type of storage that is the destination for cloud output logs.", - examples = {"hdfs", "s3", "gcs", "adls", "none"}, + examples = {"hdfs", "s3", "gcs", "adls", "wasb", "none"}, + defaultValue = "none", sources = {LogFeederConstants.CLOUD_STORAGE_DESTINATION} ) @Value("${" + LogFeederConstants.CLOUD_STORAGE_DESTINATION + ":none}") @@ -279,6 +280,15 @@ @Value("${" + LogFeederConstants.CLOUD_STORAGE_BASE_PATH + ":}") private String cloudBasePath; + @LogSearchPropertyDescription( + name = LogFeederConstants.HDFS_USER, + description = "Overrides HADOOP_USER_NAME variable at runtime", + examples = {"hdfs"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.HDFS_USER + ":}") + private String logfeederHdfsUser; + @Inject private LogEntryCacheConfig logEntryCacheConfig; @@ -492,6 +502,14 @@ public boolean isUseCloudHdfsClient() { return useCloudHdfsClient; } + public String getLogfeederHdfsUser() { + return logfeederHdfsUser; + } + + public void setLogfeederHdfsUser(String logfeederHdfsUser) { + this.logfeederHdfsUser = logfeederHdfsUser; + } + public void setUseCloudHdfsClient(boolean useCloudHdfsClient) { this.useCloudHdfsClient = useCloudHdfsClient; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java index 70772f793f..fbbf869143 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java @@ -54,15 +54,6 @@ @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}") private String hdfsFilePermissions; - @LogSearchPropertyDescription( - name = LogFeederConstants.HDFS_USER, - description = "Overrides HADOOP_USER_NAME variable at runtime", - examples = {"hdfs"}, - sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} - ) - @Value("${"+ LogFeederConstants.HDFS_USER + ":}") - private String hdfsUser; - @LogSearchPropertyDescription( name = LogFeederConstants.HDFS_KERBEROS, description = "Enable kerberos support for HDFS", @@ -97,14 +88,6 @@ public void setHdfsFilePermissions(String hdfsFilePermissions) { this.hdfsFilePermissions = hdfsFilePermissions; } - public String getHdfsUser() { - return hdfsUser; - } - - public void setHdfsUser(String hdfsUser) { - this.hdfsUser = hdfsUser; - } - public boolean isSecure() { return secure; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java index 7465a50354..282792ae86 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java @@ -49,12 +49,32 @@ @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE, description = "Rollover cloud log files after the log file size reach this limit", - examples = {"1024KB"}, - defaultValue = "80MB", + examples = {"1024"}, + defaultValue = "80", sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} ) - @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE + ":80MB}") - private String rolloverSize; + @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE + ":80}") + private Integer rolloverSize; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CLOUD_ROLLOVER_MAX_BACKUP_FILES, + description = "The number of max backup log files for rolled over logs.", + examples = {"50"}, + defaultValue = "10", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_MAX_BACKUP_FILES + ":10}") + private Integer rolloverMaxBackupFiles; + + @LogSearchPropertyDescription( + name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE_UNIT, + description = "Rollover cloud log file size unit (e.g: KB, MB etc.)", + examples = {"KB"}, + defaultValue = "MB", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE_UNIT + ":MB}") + private String rolloverSizeFormat; @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_ROLLOVER_USE_GZIP, @@ -68,7 +88,7 @@ @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_ROLLOVER_IMMEDIATE_FLUSH, - description = "Immediately flush cloud logs (to active location).", + description = "Immediately flush temporal cloud logs (to active location).", examples = {"false"}, defaultValue = "true", sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} @@ -78,7 +98,7 @@ @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_ROLLOVER_ON_SHUTDOWN, - description = "Rollover log files on shutdown", + description = "Rollover temporal cloud log files on shutdown", examples = {"false"}, defaultValue = "true", sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} @@ -88,7 +108,7 @@ @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_ROLLOVER_ON_STARTUP, - description = "Rollover log files on startup", + description = "Rollover temporal cloud log files on startup", examples = {"false"}, defaultValue = "true", sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} @@ -104,14 +124,30 @@ public void setRolloverThresholdTimeMins(int rolloverThresholdTimeMins) { this.rolloverThresholdTimeMins = rolloverThresholdTimeMins; } - public String getRolloverSize() { + public Integer getRolloverMaxBackupFiles() { + return rolloverMaxBackupFiles; + } + + public void setRolloverMaxBackupFiles(Integer rolloverMaxBackupFiles) { + this.rolloverMaxBackupFiles = rolloverMaxBackupFiles; + } + + public Integer getRolloverSize() { return rolloverSize; } - public void setRolloverSize(String rolloverSize) { + public void setRolloverSize(Integer rolloverSize) { this.rolloverSize = rolloverSize; } + public String getRolloverSizeFormat() { + return rolloverSizeFormat; + } + + public void setRolloverSizeFormat(String rolloverSizeFormat) { + this.rolloverSizeFormat = rolloverSizeFormat; + } + public boolean isUseGzip() { return useGzip; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java index f42e556857..8201051655 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.RollingFileAppender; import org.apache.logging.log4j.core.appender.rolling.CompositeTriggeringPolicy; +import org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy; import org.apache.logging.log4j.core.appender.rolling.OnStartupTriggeringPolicy; import org.apache.logging.log4j.core.appender.rolling.SizeBasedTriggeringPolicy; import org.apache.logging.log4j.core.config.AppenderRef; @@ -64,8 +65,8 @@ public static Logger createLogger(Input input, LoggerContext loggerContext, LogF PatternLayout layout = PatternLayout.newBuilder() .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build(); - SizeBasedTriggeringPolicy sizeBasedTriggeringPolicy = SizeBasedTriggeringPolicy.createPolicy( - logFeederProps.getRolloverConfig().getRolloverSize()); + String rolloverSize = logFeederProps.getRolloverConfig().getRolloverSize().toString() + logFeederProps.getRolloverConfig().getRolloverSizeFormat(); + SizeBasedTriggeringPolicy sizeBasedTriggeringPolicy = SizeBasedTriggeringPolicy.createPolicy(rolloverSize); CustomTimeBasedTriggeringPolicy customTimeBasedTriggeringPolicy = CustomTimeBasedTriggeringPolicy .createPolicy(String.valueOf(logFeederProps.getRolloverConfig().getRolloverThresholdTimeMins())); @@ -80,6 +81,9 @@ public static Logger createLogger(Input input, LoggerContext loggerContext, LogF .createPolicy(sizeBasedTriggeringPolicy, customTimeBasedTriggeringPolicy); } + DefaultRolloverStrategy defaultRolloverStrategy = DefaultRolloverStrategy.newBuilder().withMax(String.valueOf( + logFeederProps.getRolloverConfig().getRolloverMaxBackupFiles())).build(); + boolean immediateFlush = logFeederProps.getRolloverConfig().isImmediateFlush(); RollingFileAppender appender = RollingFileAppender.newBuilder() .withFileName(fileName) @@ -87,6 +91,7 @@ public static Logger createLogger(Input input, LoggerContext loggerContext, LogF .withLayout(layout) .withName(type) .withPolicy(compositeTriggeringPolicy) + .withStrategy(defaultRolloverStrategy) .withImmediateFlush(immediateFlush) .build(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java index ea52de5b87..b76f441eea 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java @@ -74,7 +74,7 @@ public void run() { */ void doUpload() { try { - final String archiveLogDir = String.join(File.separator, logFeederProps.getTmpDir(), uploaderType, "archived"); + final String archiveLogDir = String.join(File.separator, logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, "archived"); if (new File(archiveLogDir).exists()) { String[] extensions = {"log", "gz"}; Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java index cabf0042b0..a23a7157f9 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java @@ -47,9 +47,9 @@ public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) { @Override public void init(LogFeederProps logFeederProps) { logger.info("Initialize external HDFS client ..."); - if (StringUtils.isNotBlank(hdfsOutputConfig.getHdfsUser())) { - logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getHdfsUser()); - System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getHdfsUser()); + if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) { + logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser()); + System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser()); } this.fs = LogFeederHDFSUtil.buildFileSystem( hdfsOutputConfig.getHdfsHost(), diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java index 9e0a136cc0..c2a8497b0a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java @@ -34,6 +34,8 @@ */ public class HDFSUploadClient implements UploadClient { + private static final String FS_DEFAULT_FS = "fs.defaultFS"; + private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class); private FileSystem fs; @@ -43,7 +45,11 @@ public void init(LogFeederProps logFeederProps) { logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath."); Configuration configuration = new Configuration(); if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) { - configuration.set("fs.defaultFS", logFeederProps.getCustomFs()); + configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs()); + } + if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) { + logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser()); + System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser()); } this.fs = LogFeederHDFSUtil.buildFileSystem(configuration); } @@ -58,4 +64,8 @@ public void close() throws IOException { LogFeederHDFSUtil.closeFileSystem(fs); } + private boolean isHadoopFileSystem(Configuration conf) { + return conf.get(FS_DEFAULT_FS).contains("hdfs://"); + } + } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java index 2865b28643..bea29438ab 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java @@ -42,6 +42,7 @@ public static UploadClient createUploadClient(LogFeederProps logFeederProps) { boolean useHdfsClient = logFeederProps.isUseCloudHdfsClient(); if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) { logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings."); + logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS); return new HDFSUploadClient(); } else if (CloudStorageDestination.HDFS.equals(destType)) { diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index daa98216ff..c7ea335b25 100644 --- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -53,13 +53,13 @@ logfeeder.cloud.storage.bucket.bootstrap=true logfeeder.cloud.rollover.archive.base.dir=target/tmp logfeeder.cloud.rollover.threshold.min=1000 -logfeeder.cloud.rollover.threshold.size=1K +logfeeder.cloud.rollover.threshold.size=1 +logfeeder.cloud.rollover.threshold.size.unit=K logfeeder.cloud.rollover.immediate.flush=true logfeeder.hdfs.host=c7401.ambari.apache.org logfeeder.hdfs.port=8020 logfeeder.hdfs.user=hdfs -logfeeder.hdfs.output.base.dir=/apps/logfeeder logfeeder.s3.endpoint=http://localhost:4569 logfeeder.s3.secret.key=MySecretKey ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Log Feeder: send logs to cloud storage (gcs/s3 etc.) > ---------------------------------------------------- > > Key: AMBARI-24833 > URL: https://issues.apache.org/jira/browse/AMBARI-24833 > Project: Ambari > Issue Type: Bug > Components: ambari-logsearch > Affects Versions: 2.7.0 > Reporter: Olivér Szabó > Assignee: Olivér Szabó > Priority: Major > Labels: pull-request-available > Fix For: 2.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)