YARN-7891. LogAggregationIndexedFileController should support read from HAR file. (Xuan Gong via wangda)
Change-Id: I16e081f21c5f80160564c49cc49d103bd8eb7e16 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/583f4594 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/583f4594 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/583f4594 Branch: refs/heads/HDFS-12996 Commit: 583f4594314b3db25b57b1e46ea8026eab21f932 Parents: e718ac5 Author: Wangda Tan <wan...@apache.org> Authored: Wed Mar 7 15:46:22 2018 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Wed Mar 7 15:46:47 2018 -0800 ---------------------------------------------------------------------- .../hadoop-yarn/hadoop-yarn-common/pom.xml | 4 ++ .../LogAggregationIndexedFileController.java | 60 +++++++++++++------ .../TestLogAggregationIndexFileController.java | 54 +++++++++++++++++ .../application_123456_0001.har/_SUCCESS | 0 .../application_123456_0001.har/_index | 3 + .../application_123456_0001.har/_masterindex | 2 + .../application_123456_0001.har/part-0 | Bin 0 -> 4123 bytes 7 files changed, 104 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index a235478..5378072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -249,6 +249,10 @@ <exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude> + <exclude>src/test/resources/application_123456_0001.har/_index</exclude> + <exclude>src/test/resources/application_123456_0001.har/part-0</exclude> + <exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude> + <exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 56bae26..5bba2e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -495,16 +495,21 @@ public class LogAggregationIndexedFileController boolean getAllContainers = (containerIdStr == null || containerIdStr.isEmpty()); long size = logRequest.getBytes(); - List<FileStatus> nodeFiles = LogAggregationUtils - .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(), + RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(), this.remoteRootLogDir, this.remoteRootLogDirSuffix); - if (nodeFiles.isEmpty()) { + if (!nodeFiles.hasNext()) { throw new IOException("There is no available log fils for " + "application:" + appId); } - Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles); + List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId); + if (allFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles); List<FileStatus> fileToRead = getNodeLogFileToRead( - nodeFiles, nodeIdStr, appId); + allFiles, nodeIdStr, appId); byte[] buf = new byte[65535]; for (FileStatus thisNodeFile : fileToRead) { String nodeName = thisNodeFile.getPath().getName(); @@ -609,16 +614,21 @@ public class LogAggregationIndexedFileController containerIdStr.isEmpty()); String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null : LogAggregationUtils.getNodeString(nodeId); - List<FileStatus> nodeFiles = LogAggregationUtils - .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir, + RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir, this.remoteRootLogDirSuffix); - if (nodeFiles.isEmpty()) { + if (!nodeFiles.hasNext()) { throw new IOException("There is no available log fils for " + "application:" + appId); } - Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles); + List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId); + if (allFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles); List<FileStatus> fileToRead = getNodeLogFileToRead( - nodeFiles, nodeIdStr, appId); + allFiles, nodeIdStr, appId); for(FileStatus thisNodeFile : fileToRead) { try { Long checkSumIndex = checkSumFiles.get( @@ -727,21 +737,33 @@ public class LogAggregationIndexedFileController List<FileStatus> nodeFiles, String nodeId, ApplicationId appId) throws IOException { List<FileStatus> listOfFiles = new ArrayList<>(); - List<FileStatus> files = new ArrayList<>(nodeFiles); - for (FileStatus file : files) { - String nodeName = file.getPath().getName(); + for (FileStatus thisNodeFile : nodeFiles) { + String nodeName = thisNodeFile.getPath().getName(); if ((nodeId == null || nodeId.isEmpty() || nodeName.contains(LogAggregationUtils .getNodeString(nodeId))) && !nodeName.endsWith( LogAggregationUtils.TMP_FILE_SUFFIX) && !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) { - if (nodeName.equals(appId + ".har")) { - Path p = new Path("har:///" + file.getPath().toUri().getRawPath()); - files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p)); - continue; - } - listOfFiles.add(file); + listOfFiles.add(thisNodeFile); + } + } + return listOfFiles; + } + + private List<FileStatus> getAllNodeFiles( + RemoteIterator<FileStatus> nodeFiles, ApplicationId appId) + throws IOException { + List<FileStatus> listOfFiles = new ArrayList<>(); + while (nodeFiles != null && nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; } + listOfFiles.add(thisNodeFile); } return listOfFiles; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java index 9c02c1b..7922679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; @@ -27,6 +28,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; import java.io.Writer; +import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -364,6 +366,58 @@ public class TestLogAggregationIndexFileController { sysOutStream.reset(); } + @Test(timeout = 15000) + public void testFetchApplictionLogsHar() throws Exception { + List<String> newLogTypes = new ArrayList<>(); + newLogTypes.add("syslog"); + newLogTypes.add("stdout"); + newLogTypes.add("stderr"); + newLogTypes.add("test1"); + newLogTypes.add("test2"); + URL harUrl = ClassLoader.getSystemClassLoader() + .getResource("application_123456_0001.har"); + assertNotNull(harUrl); + + Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName() + + "/logs/application_123456_0001"); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + Path harPath = new Path(path, "application_123456_0001.har"); + fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath); + assertTrue(fs.exists(harPath)); + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + fileFormat.initialize(conf, "Indexed"); + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setNodeId(nodeId.toString()); + logRequest.setAppOwner(USER_UGI.getShortUserName()); + logRequest.setContainerId(containerId.toString()); + logRequest.setBytes(Long.MAX_VALUE); + List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), 3); + List<String> fileNames = new ArrayList<>(); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(newLogTypes); + Assert.assertTrue(fileNames.isEmpty()); + boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : newLogTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + } + private File createAndWriteLocalLogFile(ContainerId containerId, Path localLogDir, String logType) throws IOException { File file = new File(localLogDir.toString(), logType); http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index new file mode 100644 index 0000000..b042846 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index @@ -0,0 +1,3 @@ +%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513 +%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup +%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex new file mode 100644 index 0000000..cda8cbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex @@ -0,0 +1,2 @@ +3 +0 1897968749 0 280 http://git-wip-us.apache.org/repos/asf/hadoop/blob/583f4594/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 new file mode 100644 index 0000000..aea95fa Binary files /dev/null and b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 differ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org