[ 
https://issues.apache.org/jira/browse/HADOOP-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581172#comment-17581172
 ] 

ASF GitHub Bot commented on HADOOP-18258:
-----------------------------------------

sravanigadey commented on code in PR #4383:
URL: https://github.com/apache/hadoop/pull/4383#discussion_r948693543


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+  public static final int MAX_LINE_LENGTH = 32000;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+  /**
+   * parseAuditLog method helps in parsing the audit log
+   * into key-value pairs using regular expressions.
+   *
+   * @param singleAuditLog this is single audit log from merged audit log file
+   * @return it returns a map i.e, auditLogMap which contains key-value pairs 
of a single audit log
+   */
+  public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+    HashMap<String, String> auditLogMap = new HashMap<>();
+    if (singleAuditLog == null || singleAuditLog.length() == 0) {
+      LOG.debug(
+          "This is an empty string or null string, expected a valid string to 
parse");
+      return auditLogMap;
+    }
+    final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+    boolean patternMatching = matcher.matches();
+    if (patternMatching) {
+      for (String key : AWS_LOG_REGEXP_GROUPS) {
+        try {
+          final String value = matcher.group(key);
+          auditLogMap.put(key, value);
+        } catch (IllegalStateException e) {
+          LOG.debug(String.valueOf(e));
+        }
+      }
+    }
+    return auditLogMap;
+  }
+
+  /**
+   * parseReferrerHeader method helps in parsing the http referrer header.
+   * which is one of the key-value pair of audit log
+   *
+   * @param referrerHeader this is the http referrer header of a particular 
audit log
+   * @return it returns a map i.e, auditLogMap which contains key-value pairs
+   * of audit log as well as referrer header present in it
+   */
+  public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+    HashMap<String, String> referrerHeaderMap = new HashMap<>();
+    if (referrerHeader == null || referrerHeader.length() == 0) {
+      LOG.debug(
+          "This is an empty string or null string, expected a valid string to 
parse");
+      return referrerHeaderMap;
+    }
+    int indexOfQuestionMark = referrerHeader.indexOf("?");
+    String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+        referrerHeader.length() - 1);
+    int lengthOfReferrer = httpReferrer.length();
+    int start = 0;
+    while (start < lengthOfReferrer) {
+      int equals = httpReferrer.indexOf("=", start);
+      // no match : break
+      if (equals == -1) {
+        break;
+      }
+      String key = httpReferrer.substring(start, equals);
+      int end = httpReferrer.indexOf("&", equals);
+      // or end of string
+      if (end == -1) {
+        end = lengthOfReferrer;
+      }
+      String value = httpReferrer.substring(equals + 1, end);
+      referrerHeaderMap.put(key, value);
+      start = end + 1;
+    }
+    return referrerHeaderMap;
+  }
+
+  /**
+   * Merge and parse all the audit log files and convert data into avro file.
+   *
+   * @param fileSystem filesystem
+   * @param logsPath   source path of logs
+   * @param destPath   destination path of merged log file
+   * @return true
+   * @throws IOException on any failure
+   */
+  public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+      Path logsPath,
+      Path destPath) throws IOException {
+
+    // Listing file in given path
+    RemoteIterator<LocatedFileStatus> listOfLogFiles =
+        fileSystem.listFiles(logsPath, true);
+
+    Path destFile = new Path(destPath, "AuditLogFile");
+
+    try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+      // Iterating over the list of files to merge and parse
+      while (listOfLogFiles.hasNext()) {
+        FileStatus fileStatus = listOfLogFiles.next();
+        int fileLength = (int) fileStatus.getLen();
+        byte[] byteBuffer = new byte[fileLength];
+        try (FSDataInputStream fsDataInputStream =
+            fileSystem.open(fileStatus.getPath())) {
+
+          // Instantiating generated AvroDataRecord class
+          AvroDataRecord avroDataRecord = new AvroDataRecord();
+
+          // Instantiate DatumWriter class
+          DatumWriter<AvroDataRecord> datumWriter =
+              new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class);
+          DataFileWriter<AvroDataRecord> dataFileWriter =
+              new DataFileWriter<AvroDataRecord>(datumWriter);
+
+          List<String> longValues =
+              Arrays.asList("turnaroundtime", "bytessent", "objectsize",

Review Comment:
   used S3LogParser constants



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+  public static final int MAX_LINE_LENGTH = 32000;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+  /**
+   * parseAuditLog method helps in parsing the audit log
+   * into key-value pairs using regular expressions.
+   *
+   * @param singleAuditLog this is single audit log from merged audit log file
+   * @return it returns a map i.e, auditLogMap which contains key-value pairs 
of a single audit log
+   */
+  public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+    HashMap<String, String> auditLogMap = new HashMap<>();
+    if (singleAuditLog == null || singleAuditLog.length() == 0) {
+      LOG.debug(
+          "This is an empty string or null string, expected a valid string to 
parse");
+      return auditLogMap;
+    }
+    final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+    boolean patternMatching = matcher.matches();
+    if (patternMatching) {
+      for (String key : AWS_LOG_REGEXP_GROUPS) {
+        try {
+          final String value = matcher.group(key);
+          auditLogMap.put(key, value);
+        } catch (IllegalStateException e) {
+          LOG.debug(String.valueOf(e));
+        }
+      }
+    }
+    return auditLogMap;
+  }
+
+  /**
+   * parseReferrerHeader method helps in parsing the http referrer header.
+   * which is one of the key-value pair of audit log
+   *
+   * @param referrerHeader this is the http referrer header of a particular 
audit log
+   * @return it returns a map i.e, auditLogMap which contains key-value pairs
+   * of audit log as well as referrer header present in it
+   */
+  public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+    HashMap<String, String> referrerHeaderMap = new HashMap<>();
+    if (referrerHeader == null || referrerHeader.length() == 0) {
+      LOG.debug(
+          "This is an empty string or null string, expected a valid string to 
parse");
+      return referrerHeaderMap;
+    }
+    int indexOfQuestionMark = referrerHeader.indexOf("?");
+    String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+        referrerHeader.length() - 1);
+    int lengthOfReferrer = httpReferrer.length();
+    int start = 0;
+    while (start < lengthOfReferrer) {
+      int equals = httpReferrer.indexOf("=", start);
+      // no match : break
+      if (equals == -1) {
+        break;
+      }
+      String key = httpReferrer.substring(start, equals);
+      int end = httpReferrer.indexOf("&", equals);
+      // or end of string
+      if (end == -1) {
+        end = lengthOfReferrer;
+      }
+      String value = httpReferrer.substring(equals + 1, end);
+      referrerHeaderMap.put(key, value);
+      start = end + 1;
+    }
+    return referrerHeaderMap;
+  }
+
+  /**
+   * Merge and parse all the audit log files and convert data into avro file.
+   *
+   * @param fileSystem filesystem
+   * @param logsPath   source path of logs
+   * @param destPath   destination path of merged log file
+   * @return true
+   * @throws IOException on any failure
+   */
+  public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+      Path logsPath,
+      Path destPath) throws IOException {
+
+    // Listing file in given path
+    RemoteIterator<LocatedFileStatus> listOfLogFiles =
+        fileSystem.listFiles(logsPath, true);
+
+    Path destFile = new Path(destPath, "AuditLogFile");
+
+    try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+      // Iterating over the list of files to merge and parse
+      while (listOfLogFiles.hasNext()) {
+        FileStatus fileStatus = listOfLogFiles.next();
+        int fileLength = (int) fileStatus.getLen();
+        byte[] byteBuffer = new byte[fileLength];
+        try (FSDataInputStream fsDataInputStream =
+            fileSystem.open(fileStatus.getPath())) {
+
+          // Instantiating generated AvroDataRecord class
+          AvroDataRecord avroDataRecord = new AvroDataRecord();
+
+          // Instantiate DatumWriter class
+          DatumWriter<AvroDataRecord> datumWriter =
+              new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class);
+          DataFileWriter<AvroDataRecord> dataFileWriter =
+              new DataFileWriter<AvroDataRecord>(datumWriter);
+
+          List<String> longValues =
+              Arrays.asList("turnaroundtime", "bytessent", "objectsize",
+                  "totaltime");
+
+          // Write avro data into a file in bucket destination path
+          Path avroFile = new Path(destPath, "AvroData.avro");
+
+          // Reading the file data using LineRecordReader
+          LineRecordReader lineRecordReader =
+              new LineRecordReader(fsDataInputStream, 0L, fileLength,
+                  MAX_LINE_LENGTH);
+          LongWritable k = new LongWritable();
+          Text singleAuditLog = new Text();
+
+          try (FSDataOutputStream fsDataOutputStreamAvro = fileSystem.create(
+              avroFile)) {
+            // adding schema, output stream to DataFileWriter
+            dataFileWriter.create(AvroDataRecord.getClassSchema(),
+                fsDataOutputStreamAvro);
+
+            // Parse each and every audit log from list of logs
+            while (lineRecordReader.next(k, singleAuditLog)) {
+              // Parse audit log except referrer header
+              HashMap<String, String> auditLogMap =
+                  parseAuditLog(singleAuditLog.toString());
+
+              String referrerHeader = auditLogMap.get("referrer");
+              if (referrerHeader == null || referrerHeader.equals("-")) {
+                LOG.debug("Log didn't parsed : {}", referrerHeader);

Review Comment:
   modified
   





> Merging of S3A Audit Logs
> -------------------------
>
>                 Key: HADOOP-18258
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18258
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Sravani Gadey
>            Assignee: Sravani Gadey
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> Merging audit log files containing huge number of audit logs collected from a 
> job like Hive or Spark job containing various S3 requests like list, head, 
> get and put requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to