[ https://issues.apache.org/jira/browse/HADOOP-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581172#comment-17581172 ]
ASF GitHub Bot commented on HADOOP-18258: ----------------------------------------- sravanigadey commented on code in PR #4383: URL: https://github.com/apache/hadoop/pull/4383#discussion_r948693543 ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.audit.AvroDataRecord; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.LineRecordReader; + +import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS; +import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN; + +/** + * Merge all the audit logs present in a directory of + * multiple audit log files into a single audit log file. + */ +public class S3AAuditLogMergerAndParser { + + public static final int MAX_LINE_LENGTH = 32000; + private static final Logger LOG = + LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class); + + /** + * parseAuditLog method helps in parsing the audit log + * into key-value pairs using regular expressions. + * + * @param singleAuditLog this is single audit log from merged audit log file + * @return it returns a map i.e, auditLogMap which contains key-value pairs of a single audit log + */ + public HashMap<String, String> parseAuditLog(String singleAuditLog) { + HashMap<String, String> auditLogMap = new HashMap<>(); + if (singleAuditLog == null || singleAuditLog.length() == 0) { + LOG.debug( + "This is an empty string or null string, expected a valid string to parse"); + return auditLogMap; + } + final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog); + boolean patternMatching = matcher.matches(); + if (patternMatching) { + for (String key : AWS_LOG_REGEXP_GROUPS) { + try { + final String value = matcher.group(key); + auditLogMap.put(key, value); + } catch (IllegalStateException e) { + LOG.debug(String.valueOf(e)); + } + } + } + return auditLogMap; + } + + /** + * parseReferrerHeader method helps in parsing the http referrer header. + * which is one of the key-value pair of audit log + * + * @param referrerHeader this is the http referrer header of a particular audit log + * @return it returns a map i.e, auditLogMap which contains key-value pairs + * of audit log as well as referrer header present in it + */ + public HashMap<String, String> parseReferrerHeader(String referrerHeader) { + HashMap<String, String> referrerHeaderMap = new HashMap<>(); + if (referrerHeader == null || referrerHeader.length() == 0) { + LOG.debug( + "This is an empty string or null string, expected a valid string to parse"); + return referrerHeaderMap; + } + int indexOfQuestionMark = referrerHeader.indexOf("?"); + String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1, + referrerHeader.length() - 1); + int lengthOfReferrer = httpReferrer.length(); + int start = 0; + while (start < lengthOfReferrer) { + int equals = httpReferrer.indexOf("=", start); + // no match : break + if (equals == -1) { + break; + } + String key = httpReferrer.substring(start, equals); + int end = httpReferrer.indexOf("&", equals); + // or end of string + if (end == -1) { + end = lengthOfReferrer; + } + String value = httpReferrer.substring(equals + 1, end); + referrerHeaderMap.put(key, value); + start = end + 1; + } + return referrerHeaderMap; + } + + /** + * Merge and parse all the audit log files and convert data into avro file. + * + * @param fileSystem filesystem + * @param logsPath source path of logs + * @param destPath destination path of merged log file + * @return true + * @throws IOException on any failure + */ + public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem, + Path logsPath, + Path destPath) throws IOException { + + // Listing file in given path + RemoteIterator<LocatedFileStatus> listOfLogFiles = + fileSystem.listFiles(logsPath, true); + + Path destFile = new Path(destPath, "AuditLogFile"); + + try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) { + + // Iterating over the list of files to merge and parse + while (listOfLogFiles.hasNext()) { + FileStatus fileStatus = listOfLogFiles.next(); + int fileLength = (int) fileStatus.getLen(); + byte[] byteBuffer = new byte[fileLength]; + try (FSDataInputStream fsDataInputStream = + fileSystem.open(fileStatus.getPath())) { + + // Instantiating generated AvroDataRecord class + AvroDataRecord avroDataRecord = new AvroDataRecord(); + + // Instantiate DatumWriter class + DatumWriter<AvroDataRecord> datumWriter = + new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class); + DataFileWriter<AvroDataRecord> dataFileWriter = + new DataFileWriter<AvroDataRecord>(datumWriter); + + List<String> longValues = + Arrays.asList("turnaroundtime", "bytessent", "objectsize", Review Comment: used S3LogParser constants ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.audit.AvroDataRecord; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.LineRecordReader; + +import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS; +import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN; + +/** + * Merge all the audit logs present in a directory of + * multiple audit log files into a single audit log file. + */ +public class S3AAuditLogMergerAndParser { + + public static final int MAX_LINE_LENGTH = 32000; + private static final Logger LOG = + LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class); + + /** + * parseAuditLog method helps in parsing the audit log + * into key-value pairs using regular expressions. + * + * @param singleAuditLog this is single audit log from merged audit log file + * @return it returns a map i.e, auditLogMap which contains key-value pairs of a single audit log + */ + public HashMap<String, String> parseAuditLog(String singleAuditLog) { + HashMap<String, String> auditLogMap = new HashMap<>(); + if (singleAuditLog == null || singleAuditLog.length() == 0) { + LOG.debug( + "This is an empty string or null string, expected a valid string to parse"); + return auditLogMap; + } + final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog); + boolean patternMatching = matcher.matches(); + if (patternMatching) { + for (String key : AWS_LOG_REGEXP_GROUPS) { + try { + final String value = matcher.group(key); + auditLogMap.put(key, value); + } catch (IllegalStateException e) { + LOG.debug(String.valueOf(e)); + } + } + } + return auditLogMap; + } + + /** + * parseReferrerHeader method helps in parsing the http referrer header. + * which is one of the key-value pair of audit log + * + * @param referrerHeader this is the http referrer header of a particular audit log + * @return it returns a map i.e, auditLogMap which contains key-value pairs + * of audit log as well as referrer header present in it + */ + public HashMap<String, String> parseReferrerHeader(String referrerHeader) { + HashMap<String, String> referrerHeaderMap = new HashMap<>(); + if (referrerHeader == null || referrerHeader.length() == 0) { + LOG.debug( + "This is an empty string or null string, expected a valid string to parse"); + return referrerHeaderMap; + } + int indexOfQuestionMark = referrerHeader.indexOf("?"); + String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1, + referrerHeader.length() - 1); + int lengthOfReferrer = httpReferrer.length(); + int start = 0; + while (start < lengthOfReferrer) { + int equals = httpReferrer.indexOf("=", start); + // no match : break + if (equals == -1) { + break; + } + String key = httpReferrer.substring(start, equals); + int end = httpReferrer.indexOf("&", equals); + // or end of string + if (end == -1) { + end = lengthOfReferrer; + } + String value = httpReferrer.substring(equals + 1, end); + referrerHeaderMap.put(key, value); + start = end + 1; + } + return referrerHeaderMap; + } + + /** + * Merge and parse all the audit log files and convert data into avro file. + * + * @param fileSystem filesystem + * @param logsPath source path of logs + * @param destPath destination path of merged log file + * @return true + * @throws IOException on any failure + */ + public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem, + Path logsPath, + Path destPath) throws IOException { + + // Listing file in given path + RemoteIterator<LocatedFileStatus> listOfLogFiles = + fileSystem.listFiles(logsPath, true); + + Path destFile = new Path(destPath, "AuditLogFile"); + + try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) { + + // Iterating over the list of files to merge and parse + while (listOfLogFiles.hasNext()) { + FileStatus fileStatus = listOfLogFiles.next(); + int fileLength = (int) fileStatus.getLen(); + byte[] byteBuffer = new byte[fileLength]; + try (FSDataInputStream fsDataInputStream = + fileSystem.open(fileStatus.getPath())) { + + // Instantiating generated AvroDataRecord class + AvroDataRecord avroDataRecord = new AvroDataRecord(); + + // Instantiate DatumWriter class + DatumWriter<AvroDataRecord> datumWriter = + new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class); + DataFileWriter<AvroDataRecord> dataFileWriter = + new DataFileWriter<AvroDataRecord>(datumWriter); + + List<String> longValues = + Arrays.asList("turnaroundtime", "bytessent", "objectsize", + "totaltime"); + + // Write avro data into a file in bucket destination path + Path avroFile = new Path(destPath, "AvroData.avro"); + + // Reading the file data using LineRecordReader + LineRecordReader lineRecordReader = + new LineRecordReader(fsDataInputStream, 0L, fileLength, + MAX_LINE_LENGTH); + LongWritable k = new LongWritable(); + Text singleAuditLog = new Text(); + + try (FSDataOutputStream fsDataOutputStreamAvro = fileSystem.create( + avroFile)) { + // adding schema, output stream to DataFileWriter + dataFileWriter.create(AvroDataRecord.getClassSchema(), + fsDataOutputStreamAvro); + + // Parse each and every audit log from list of logs + while (lineRecordReader.next(k, singleAuditLog)) { + // Parse audit log except referrer header + HashMap<String, String> auditLogMap = + parseAuditLog(singleAuditLog.toString()); + + String referrerHeader = auditLogMap.get("referrer"); + if (referrerHeader == null || referrerHeader.equals("-")) { + LOG.debug("Log didn't parsed : {}", referrerHeader); Review Comment: modified > Merging of S3A Audit Logs > ------------------------- > > Key: HADOOP-18258 > URL: https://issues.apache.org/jira/browse/HADOOP-18258 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Reporter: Sravani Gadey > Assignee: Sravani Gadey > Priority: Major > Labels: pull-request-available > Time Spent: 12.5h > Remaining Estimate: 0h > > Merging audit log files containing huge number of audit logs collected from a > job like Hive or Spark job containing various S3 requests like list, head, > get and put requests. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org