[ https://issues.apache.org/jira/browse/HADOOP-18258?focusedWorklogId=783992&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783992 ]
ASF GitHub Bot logged work on HADOOP-18258: ------------------------------------------- Author: ASF GitHub Bot Created on: 22/Jun/22 22:59 Start Date: 22/Jun/22 22:59 Worklog Time Spent: 10m Work Description: mukund-thakur commented on code in PR #4383: URL: https://github.com/apache/hadoop/pull/4383#discussion_r904350867 ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS; + +/** + * AuditTool is a Command Line Interface. + * i.e, it's functionality is to parse the merged audit log file. + * and generate avro file. + */ +public class AuditTool extends Configured implements Tool, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class); + + private final String entryPoint = "s3audit"; + + private PrintWriter out; + + // Exit codes + private static final int SUCCESS = EXIT_SUCCESS; + private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR; + + /** + * Error String when the wrong FS is used for binding: {@value}. + **/ + @VisibleForTesting + public static final String WRONG_FILESYSTEM = "Wrong filesystem for "; + + private final String usage = entryPoint + " s3a://BUCKET\n"; + + public AuditTool() { + } + + /** + * Tells us the usage of the AuditTool by commands. + * + * @return the string USAGE + */ + public String getUsage() { + return usage; + } + + /** + * This run method in AuditTool takes S3 bucket path. + * which contains audit log files from command line arguments. + * and merge the audit log files present in that path into single file in. + * local system. + * + * @param args command specific arguments. + * @return SUCCESS i.e, '0', which is an exit code. + * @throws Exception on any failure. + */ + @Override + public int run(String[] args) throws Exception { + List<String> argv = new ArrayList<>(Arrays.asList(args)); + if (argv.isEmpty()) { + errorln(getUsage()); + throw invalidArgs("No bucket specified"); + } + //Path of audit log files in s3 bucket + Path s3LogsPath = new Path(argv.get(0)); + + //Setting the file system + URI fsURI = toUri(String.valueOf(s3LogsPath)); + S3AFileSystem s3AFileSystem = + bindFilesystem(FileSystem.newInstance(fsURI, getConf())); + RemoteIterator<LocatedFileStatus> listOfS3LogFiles = + s3AFileSystem.listFiles(s3LogsPath, true); + + //Merging local audit files into a single file + File s3aLogsDirectory = new File(s3LogsPath.getName()); + boolean s3aLogsDirectoryCreation = false; + if (!s3aLogsDirectory.exists()) { + s3aLogsDirectoryCreation = s3aLogsDirectory.mkdir(); + } + if(s3aLogsDirectoryCreation) { + while (listOfS3LogFiles.hasNext()) { + Path s3LogFilePath = listOfS3LogFiles.next().getPath(); + File s3LogLocalFilePath = + new File(s3aLogsDirectory, s3LogFilePath.getName()); + boolean localFileCreation = s3LogLocalFilePath.createNewFile(); + if (localFileCreation) { + FileStatus fileStatus = s3AFileSystem.getFileStatus(s3LogFilePath); + long s3LogFileLength = fileStatus.getLen(); + //Reads s3 file data into byte buffer + byte[] s3LogDataBuffer = + readDataset(s3AFileSystem, s3LogFilePath, (int) s3LogFileLength); + //Writes byte array into local file + FileUtils.writeByteArrayToFile(s3LogLocalFilePath, s3LogDataBuffer); + } + } + } + + //Calls S3AAuditLogMerger for implementing merging code + //by passing local audit log files directory which are copied from s3 bucket + S3AAuditLogMerger s3AAuditLogMerger = new S3AAuditLogMerger(); + s3AAuditLogMerger.mergeFiles(s3aLogsDirectory.getPath()); Review Comment: merge files should return true/false ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS; + +/** + * AuditTool is a Command Line Interface. + * i.e, it's functionality is to parse the merged audit log file. + * and generate avro file. + */ +public class AuditTool extends Configured implements Tool, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class); + + private final String entryPoint = "s3audit"; + + private PrintWriter out; + + // Exit codes + private static final int SUCCESS = EXIT_SUCCESS; + private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR; + + /** + * Error String when the wrong FS is used for binding: {@value}. + **/ + @VisibleForTesting + public static final String WRONG_FILESYSTEM = "Wrong filesystem for "; Review Comment: You can say unsupported operation exception with message not supported. What if we do something similar for Azure in future. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS; + +/** + * AuditTool is a Command Line Interface. + * i.e, it's functionality is to parse the merged audit log file. + * and generate avro file. + */ +public class AuditTool extends Configured implements Tool, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class); + + private final String entryPoint = "s3audit"; + + private PrintWriter out; + + // Exit codes + private static final int SUCCESS = EXIT_SUCCESS; + private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR; + + /** + * Error String when the wrong FS is used for binding: {@value}. + **/ + @VisibleForTesting + public static final String WRONG_FILESYSTEM = "Wrong filesystem for "; + + private final String usage = entryPoint + " s3a://BUCKET\n"; + + public AuditTool() { + } + + /** + * Tells us the usage of the AuditTool by commands. + * + * @return the string USAGE + */ + public String getUsage() { + return usage; + } + + /** + * This run method in AuditTool takes S3 bucket path. + * which contains audit log files from command line arguments. + * and merge the audit log files present in that path into single file in. + * local system. + * + * @param args command specific arguments. + * @return SUCCESS i.e, '0', which is an exit code. + * @throws Exception on any failure. + */ + @Override + public int run(String[] args) throws Exception { + List<String> argv = new ArrayList<>(Arrays.asList(args)); + if (argv.isEmpty()) { + errorln(getUsage()); + throw invalidArgs("No bucket specified"); + } + //Path of audit log files in s3 bucket + Path s3LogsPath = new Path(argv.get(0)); + + //Setting the file system + URI fsURI = toUri(String.valueOf(s3LogsPath)); + S3AFileSystem s3AFileSystem = + bindFilesystem(FileSystem.newInstance(fsURI, getConf())); + RemoteIterator<LocatedFileStatus> listOfS3LogFiles = + s3AFileSystem.listFiles(s3LogsPath, true); + + //Merging local audit files into a single file + File s3aLogsDirectory = new File(s3LogsPath.getName()); + boolean s3aLogsDirectoryCreation = false; + if (!s3aLogsDirectory.exists()) { + s3aLogsDirectoryCreation = s3aLogsDirectory.mkdir(); + } + if(s3aLogsDirectoryCreation) { + while (listOfS3LogFiles.hasNext()) { + Path s3LogFilePath = listOfS3LogFiles.next().getPath(); + File s3LogLocalFilePath = + new File(s3aLogsDirectory, s3LogFilePath.getName()); + boolean localFileCreation = s3LogLocalFilePath.createNewFile(); + if (localFileCreation) { + FileStatus fileStatus = s3AFileSystem.getFileStatus(s3LogFilePath); + long s3LogFileLength = fileStatus.getLen(); + //Reads s3 file data into byte buffer + byte[] s3LogDataBuffer = Review Comment: yeah this is going to cause OOM for bigger files. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditLogMerger.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Merge all the audit logs present in a directory of. + * multiple audit log files into a single audit log file. + */ +public class S3AAuditLogMerger { + + private static final Logger LOG = + LoggerFactory.getLogger(S3AAuditLogMerger.class); + + /** + * Merge all the audit log files from a directory into single audit log file. + * @param auditLogsDirectoryPath path where audit log files are present. + * @throws IOException on any failure. + */ + public void mergeFiles(String auditLogsDirectoryPath) throws IOException { + File auditLogFilesDirectory = new File(auditLogsDirectoryPath); + String[] auditLogFileNames = auditLogFilesDirectory.list(); + + //Merging of audit log files present in a directory into a single audit log file + if (auditLogFileNames != null && auditLogFileNames.length != 0) { + File auditLogFile = new File("AuditLogFile"); + try (PrintWriter printWriter = new PrintWriter(auditLogFile, + "UTF-8")) { + for (String singleAuditLogFileName : auditLogFileNames) { + File singleAuditLogFile = + new File(auditLogFilesDirectory, singleAuditLogFileName); + try (BufferedReader bufferedReader = + new BufferedReader( + new InputStreamReader(new FileInputStream(singleAuditLogFile), + "UTF-8"))) { + String singleLine = bufferedReader.readLine(); + while (singleLine != null) { + printWriter.println(singleLine); Review Comment: Yes, convert to key -> value pair for each log line. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditLogMerger.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Merge all the audit logs present in a directory of. + * multiple audit log files into a single audit log file. + */ +public class S3AAuditLogMerger { + + private static final Logger LOG = + LoggerFactory.getLogger(S3AAuditLogMerger.class); + + /** + * Merge all the audit log files from a directory into single audit log file. + * @param auditLogsDirectoryPath path where audit log files are present. + * @throws IOException on any failure. + */ + public void mergeFiles(String auditLogsDirectoryPath) throws IOException { + File auditLogFilesDirectory = new File(auditLogsDirectoryPath); + String[] auditLogFileNames = auditLogFilesDirectory.list(); + + //Merging of audit log files present in a directory into a single audit log file + if (auditLogFileNames != null && auditLogFileNames.length != 0) { + File auditLogFile = new File("AuditLogFile"); Review Comment: Why not use iterator? ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS; + +/** + * AuditTool is a Command Line Interface. + * i.e, it's functionality is to parse the merged audit log file. + * and generate avro file. + */ +public class AuditTool extends Configured implements Tool, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class); + + private final String entryPoint = "s3audit"; + + private PrintWriter out; + + // Exit codes + private static final int SUCCESS = EXIT_SUCCESS; + private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR; + + /** + * Error String when the wrong FS is used for binding: {@value}. + **/ + @VisibleForTesting + public static final String WRONG_FILESYSTEM = "Wrong filesystem for "; + + private final String usage = entryPoint + " s3a://BUCKET\n"; + + public AuditTool() { + } + + /** + * Tells us the usage of the AuditTool by commands. + * + * @return the string USAGE + */ + public String getUsage() { + return usage; + } + + /** + * This run method in AuditTool takes S3 bucket path. + * which contains audit log files from command line arguments. + * and merge the audit log files present in that path into single file in. + * local system. + * + * @param args command specific arguments. + * @return SUCCESS i.e, '0', which is an exit code. + * @throws Exception on any failure. + */ + @Override + public int run(String[] args) throws Exception { + List<String> argv = new ArrayList<>(Arrays.asList(args)); + if (argv.isEmpty()) { + errorln(getUsage()); + throw invalidArgs("No bucket specified"); + } + //Path of audit log files in s3 bucket + Path s3LogsPath = new Path(argv.get(0)); + Review Comment: yes take both input and output path as argument and do validation like, logs path should be a directory with some files, if empty fail, if not exists fail etc. Destination path should be empty if already exists or shouldn't exist. Or print warning message that I am going to overwrite it. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditLogMerger.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Merge all the audit logs present in a directory of. + * multiple audit log files into a single audit log file. + */ +public class S3AAuditLogMerger { + + private static final Logger LOG = + LoggerFactory.getLogger(S3AAuditLogMerger.class); + + /** + * Merge all the audit log files from a directory into single audit log file. + * @param auditLogsDirectoryPath path where audit log files are present. + * @throws IOException on any failure. + */ + public void mergeFiles(String auditLogsDirectoryPath) throws IOException { + File auditLogFilesDirectory = new File(auditLogsDirectoryPath); + String[] auditLogFileNames = auditLogFilesDirectory.list(); + + //Merging of audit log files present in a directory into a single audit log file + if (auditLogFileNames != null && auditLogFileNames.length != 0) { + File auditLogFile = new File("AuditLogFile"); + try (PrintWriter printWriter = new PrintWriter(auditLogFile, + "UTF-8")) { + for (String singleAuditLogFileName : auditLogFileNames) { + File singleAuditLogFile = + new File(auditLogFilesDirectory, singleAuditLogFileName); + try (BufferedReader bufferedReader = + new BufferedReader( + new InputStreamReader(new FileInputStream(singleAuditLogFile), + "UTF-8"))) { Review Comment: try to refactor into smaller methods and write unit tests for them separately. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS; + +/** + * AuditTool is a Command Line Interface. + * i.e, it's functionality is to parse the merged audit log file. + * and generate avro file. + */ +public class AuditTool extends Configured implements Tool, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class); + + private final String entryPoint = "s3audit"; + + private PrintWriter out; + + // Exit codes + private static final int SUCCESS = EXIT_SUCCESS; + private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR; + + /** + * Error String when the wrong FS is used for binding: {@value}. + **/ + @VisibleForTesting + public static final String WRONG_FILESYSTEM = "Wrong filesystem for "; + + private final String usage = entryPoint + " s3a://BUCKET\n"; + + public AuditTool() { + } + + /** + * Tells us the usage of the AuditTool by commands. + * + * @return the string USAGE + */ + public String getUsage() { + return usage; + } + + /** + * This run method in AuditTool takes S3 bucket path. + * which contains audit log files from command line arguments. + * and merge the audit log files present in that path into single file in. + * local system. + * + * @param args command specific arguments. + * @return SUCCESS i.e, '0', which is an exit code. + * @throws Exception on any failure. + */ + @Override + public int run(String[] args) throws Exception { + List<String> argv = new ArrayList<>(Arrays.asList(args)); + if (argv.isEmpty()) { + errorln(getUsage()); + throw invalidArgs("No bucket specified"); + } + //Path of audit log files in s3 bucket + Path s3LogsPath = new Path(argv.get(0)); + + //Setting the file system + URI fsURI = toUri(String.valueOf(s3LogsPath)); + S3AFileSystem s3AFileSystem = + bindFilesystem(FileSystem.newInstance(fsURI, getConf())); + RemoteIterator<LocatedFileStatus> listOfS3LogFiles = + s3AFileSystem.listFiles(s3LogsPath, true); + + //Merging local audit files into a single file + File s3aLogsDirectory = new File(s3LogsPath.getName()); + boolean s3aLogsDirectoryCreation = false; + if (!s3aLogsDirectory.exists()) { + s3aLogsDirectoryCreation = s3aLogsDirectory.mkdir(); + } + if(s3aLogsDirectoryCreation) { + while (listOfS3LogFiles.hasNext()) { + Path s3LogFilePath = listOfS3LogFiles.next().getPath(); + File s3LogLocalFilePath = + new File(s3aLogsDirectory, s3LogFilePath.getName()); + boolean localFileCreation = s3LogLocalFilePath.createNewFile(); + if (localFileCreation) { + FileStatus fileStatus = s3AFileSystem.getFileStatus(s3LogFilePath); + long s3LogFileLength = fileStatus.getLen(); + //Reads s3 file data into byte buffer + byte[] s3LogDataBuffer = Review Comment: Also if we are first copying all the files from S3 to local fs, we can try to parallelize this as well. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestS3AAuditLogMerger.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * MergerTest will implement different tests on Merger class methods. + */ +public class TestS3AAuditLogMerger { + + private static final Logger LOG = + LoggerFactory.getLogger(TestS3AAuditLogMerger.class); + + private final S3AAuditLogMerger s3AAuditLogMerger = new S3AAuditLogMerger(); + + /** + * Sample directories and files to test. + */ + private final File auditLogFile = new File("AuditLogFile"); + private File file1; + private File file2; + private File file3; + private File dir1; + private File dir2; + + /** + * Testing the mergeFiles method in Merger class. + * by passing a sample directory which contains files with some content in it. + * and checks if files in a directory are merged into single file. + */ + @Test + public void testMergeFiles() throws IOException { + dir1 = Files.createTempDirectory("sampleFilesDirectory").toFile(); + file1 = File.createTempFile("sampleFile1", ".txt", dir1); + file2 = File.createTempFile("sampleFile2", ".txt", dir1); + file3 = File.createTempFile("sampleFile3", ".txt", dir1); + try (FileWriter fw = new FileWriter(file1); + FileWriter fw1 = new FileWriter(file2); + FileWriter fw2 = new FileWriter(file3)) { + fw.write("abcd"); + fw1.write("efgh"); + fw2.write("ijkl"); + } + s3AAuditLogMerger.mergeFiles(dir1.getPath()); + String str = + new String(Files.readAllBytes(Paths.get(auditLogFile.getPath()))); + //File content of each audit log file in merged audit log file are + // divided by '\n'. + // Performing assertions will be easy by replacing '\n' with '' + String fileText = str.replace("\n", ""); + assertTrue("the string 'abcd' should be in the merged file", + fileText.contains("abcd")); + assertTrue("the string 'efgh' should be in the merged file", + fileText.contains("efgh")); + assertTrue("the string 'ijkl' should be in the merged file", + fileText.contains("ijkl")); + } + + /** + * Testing the merged file. + * by passing different directories which contains files with some content. + * in it and checks if the file is overwritten by new file contents. + */ + @Test + public void testMergedFile() throws IOException { + //Testing the merged file with contents of first directory + dir1 = Files.createTempDirectory("sampleFilesDirectory").toFile(); + file1 = File.createTempFile("sampleFile1", ".txt", dir1); + file2 = File.createTempFile("sampleFile2", ".txt", dir1); + try (FileWriter fw = new FileWriter(file1); + FileWriter fw1 = new FileWriter(file2)) { + fw.write("abcd"); + fw1.write("efgh"); + } + s3AAuditLogMerger.mergeFiles(dir1.getPath()); + String str = + new String(Files.readAllBytes(Paths.get(auditLogFile.getPath()))); + //File content of each audit log file in merged audit log file are + // divided by '\n'. + // Performing assertions will be easy by replacing '\n' with '' + String fileText = str.replace("\n", ""); + assertTrue("the string 'abcd' should be in the merged file", + fileText.contains("abcd")); + assertTrue("the string 'efgh' should be in the merged file", + fileText.contains("efgh")); + assertFalse("the string 'ijkl' should not be in the merged file", + fileText.contains("ijkl")); + + //Testing the merged file with contents of second directory + dir2 = Files.createTempDirectory("sampleFilesDirectory1").toFile(); + file3 = File.createTempFile("sampleFile3", ".txt", dir2); + try (FileWriter fw2 = new FileWriter(file3)) { + fw2.write("ijkl"); + } + s3AAuditLogMerger.mergeFiles(dir2.getPath()); + String str1 = + new String(Files.readAllBytes(Paths.get(auditLogFile.getPath()))); + //File content of each audit log file in merged audit log file are + // divided by '\n'. + // Performing assertions will be easy by replacing '\n' with '' + String fileText1 = str1.replace("\n", ""); + assertFalse("the string 'abcd' should not be in the merged file", + fileText1.contains("abcd")); + assertFalse("the string 'efgh' should not be in the merged file", + fileText1.contains("efgh")); + assertTrue("the string 'ijkl' should be in the merged file", + fileText1.contains("ijkl")); + } + + /** + * Testing the mergeFiles method in Merger class. + * by passing an empty directory and checks if merged file is created or not. + */ + @Test + public void testMergeFilesEmptyDir() throws IOException { + dir1 = Files.createTempDirectory("emptyFilesDirectory").toFile(); + if (auditLogFile.exists()) { + LOG.info("AuditLogFile already exists and we are deleting it here"); + if (auditLogFile.delete()) { + LOG.debug("AuditLogFile deleted"); + } + } + s3AAuditLogMerger.mergeFiles(dir1.getPath()); + assertFalse( + "This AuditLogFile shouldn't exist if input directory is empty ", + auditLogFile.exists()); + } + + /** + * Delete all the sample directories and sample files after all tests. + */ + @After + public void tearDown() throws Exception { + if (auditLogFile.exists()) { Review Comment: Use a base directory and delete all the children files in one go rather than one by one. Issue Time Tracking ------------------- Worklog Id: (was: 783992) Time Spent: 6.5h (was: 6h 20m) > Merging of S3A Audit Logs > ------------------------- > > Key: HADOOP-18258 > URL: https://issues.apache.org/jira/browse/HADOOP-18258 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Reporter: Sravani Gadey > Assignee: Sravani Gadey > Priority: Major > Labels: pull-request-available > Time Spent: 6.5h > Remaining Estimate: 0h > > Merging audit log files containing huge number of audit logs collected from a > job like Hive or Spark job containing various S3 requests like list, head, > get and put requests. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org