This is an automated email from the ASF dual-hosted git repository. eyang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new accb811 YARN-6929. Improved partition algorithm for yarn remote-app-log-dir. Contributed by Prabhu Joseph accb811 is described below commit accb811e5727f2a780a41cd5e50bab47a0cccb68 Author: Eric Yang <ey...@apache.org> AuthorDate: Tue Apr 30 17:04:59 2019 -0400 YARN-6929. Improved partition algorithm for yarn remote-app-log-dir. Contributed by Prabhu Joseph --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 +- .../AggregatedLogDeletionService.java | 16 +- .../yarn/logaggregation/LogAggregationUtils.java | 184 +++++++++++++++++++-- .../LogAggregationFileController.java | 49 +++--- .../LogAggregationFileControllerFactory.java | 37 +++-- .../ifile/LogAggregationIndexedFileController.java | 7 + .../src/main/resources/yarn-default.xml | 8 + .../TestAggregatedLogDeletionService.java | 163 +++++++++++------- .../logaggregation/TestContainerLogsUtils.java | 5 +- .../logaggregation/TestLogAggregationService.java | 40 ++++- 10 files changed, 398 insertions(+), 122 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b21d763..273f1a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1434,13 +1434,20 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; /** - * The remote log dir will be created at - * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} + * The remote log dir will be created at below location. + * NM_REMOTE_APP_LOG_DIR/${user}/bucket_{NM_REMOTE_APP_LOG_DIR_SUFFIX} + * /${bucketDir}/${appId} */ public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; + /** Specifies whether Older Application Log Directory is included. */ + public static final String NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER = + NM_PREFIX + "remote-app-log-dir-include-older"; + public static final boolean DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER = + true; + public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 90395aa..033339a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -67,7 +67,7 @@ public class AggregatedLogDeletionService extends AbstractService { public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; - this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + this.suffix = LogAggregationUtils.getBucketSuffix(); this.remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); @@ -82,8 +82,18 @@ public class AggregatedLogDeletionService extends AbstractService { FileSystem fs = remoteRootLogDir.getFileSystem(conf); for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { if(userDir.isDirectory()) { - Path userDirPath = new Path(userDir.getPath(), suffix); - deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); + for (FileStatus suffixDir : fs.listStatus(userDir.getPath())) { + Path suffixDirPath = suffixDir.getPath(); + if (suffixDir.isDirectory() && suffixDirPath.getName(). + startsWith(suffix)) { + for (FileStatus bucketDir : fs.listStatus(suffixDirPath)) { + if (bucketDir.isDirectory()) { + deleteOldLogDirsFrom(bucketDir.getPath(), cutoffMillis, + fs, rmClient); + } + } + } + } } } } catch (Throwable t) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index edf2cf3..3f5151b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -38,6 +38,7 @@ import java.util.List; public class LogAggregationUtils { public static final String TMP_FILE_SUFFIX = ".tmp"; + private static final String BUCKET_SUFFIX = "bucket_"; /** * Constructs the full filename for an application's log file per node. @@ -64,8 +65,22 @@ public class LogAggregationUtils { */ public static Path getRemoteAppLogDir(Path remoteRootLogDir, ApplicationId appId, String user, String suffix) { - return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix), - appId.toString()); + return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix, + appId), appId.toString()); + } + + /** + * Gets the older remote app log dir. + * @param appId the application id + * @param user the application owner + * @param remoteRootLogDir the aggregated log remote root log dir + * @param suffix the log directory suffix + * @return the remote application specific log dir. + */ + public static Path getOlderRemoteAppLogDir(ApplicationId appId, + String user, Path remoteRootLogDir, String suffix) { + return new Path(getOlderRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), appId.toString()); } /** @@ -77,6 +92,19 @@ public class LogAggregationUtils { */ public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir, String user, String suffix) { + suffix = getBucketSuffix() + suffix; + return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix); + } + + /** + * Gets the older remote suffixed log dir for the user. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @return the older remote suffixed log dir. + */ + public static Path getOlderRemoteLogSuffixedDir(Path remoteRootLogDir, + String user, String suffix) { if (suffix == null || suffix.isEmpty()) { return getRemoteLogUserDir(remoteRootLogDir, user); } @@ -95,6 +123,33 @@ public class LogAggregationUtils { } /** + * Gets the remote log user's bucket dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @return the remote log per user per cluster timestamp per bucket dir. + */ + public static Path getRemoteBucketDir(Path remoteRootLogDir, String user, + String suffix, ApplicationId appId) { + int bucket = appId.getId() % 10000; + String bucketDir = String.format("%04d", bucket); + return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, + user, suffix), bucketDir); + } + + /** + * Check if older Application Log Directory has to be included. + * @param conf the configuration + * @return Is Older App Log Dir enabled? + */ + public static boolean isOlderPathEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration. + NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER); + } + + /** * Returns the suffix component of the log dir. * @param conf the configuration * @return the suffix which will be appended to the user log dir. @@ -104,6 +159,14 @@ public class LogAggregationUtils { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); } + /** + * Returns the bucket suffix component of the log dir. + * @return the bucket suffix which appended to user log dir + */ + public static String getBucketSuffix() { + return BUCKET_SUFFIX; + } + /** * Converts a nodeId to a form used in the app log file name. @@ -177,6 +240,24 @@ public class LogAggregationUtils { /** * Get all available log files under remote app log directory. * @param conf the configuration + * @param remoteAppLogDir the application log directory + * @param appId the applicationId + * @param appOwner the application owner + * @return the iterator of available log files + * @throws IOException if there is no log file directory + */ + public static RemoteIterator<FileStatus> getNodeFiles(Configuration conf, + Path remoteAppLogDir, ApplicationId appId, String appOwner) + throws IOException { + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); + } + + /** + * Get all available log files under remote app log directory. + * @param conf the configuration * @param appId the applicationId * @param appOwner the application owner * @param remoteRootLogDir the remote root log directory @@ -188,14 +269,58 @@ public class LogAggregationUtils { Configuration conf, ApplicationId appId, String appOwner, org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { + RemoteIterator<FileStatus> nodeFilesCur= null; + RemoteIterator<FileStatus> nodeFilesPrev = null; + StringBuilder diagnosticsMsg = new StringBuilder(); + + // Get Node Files from new app log dir Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); - RemoteIterator<FileStatus> nodeFiles = null; - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - conf).listStatus(remoteAppLogDir); - return nodeFiles; + try { + nodeFilesCur = getNodeFiles(conf, remoteAppLogDir, appId, appOwner); + } catch (IOException ex) { + diagnosticsMsg.append(ex.getMessage() + "\n"); + } + + // Get Node Files from old app log dir + if (isOlderPathEnabled(conf)) { + remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner, + remoteRootLogDir, suffix); + try { + nodeFilesPrev = getNodeFiles(conf, + remoteAppLogDir, appId, appOwner); + } catch (IOException ex) { + diagnosticsMsg.append(ex.getMessage() + "\n"); + } + + // Return older files if new app log dir does not exist + if (nodeFilesCur == null) { + return nodeFilesPrev; + } else if (nodeFilesPrev != null) { + // Return both new and old node files combined + RemoteIterator<FileStatus> curDir = nodeFilesCur; + RemoteIterator<FileStatus> prevDir = nodeFilesPrev; + RemoteIterator<FileStatus> nodeFilesCombined = new + RemoteIterator<FileStatus>() { + @Override + public boolean hasNext() throws IOException { + return prevDir.hasNext() || curDir.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return prevDir.hasNext() ? prevDir.next() : curDir.next(); + } + }; + return nodeFilesCombined; + } + } + + // Error reading from or new app log dir does not exist + if (nodeFilesCur == null) { + throw new IOException(diagnosticsMsg.toString()); + } + return nodeFilesCur; } /** @@ -212,13 +337,39 @@ public class LogAggregationUtils { Configuration conf, ApplicationId appId, String appOwner, org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { + StringBuilder diagnosticsMsg = new StringBuilder(); Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); List<FileStatus> nodeFiles = new ArrayList<>(); Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( - qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + + // Get Node Files from new app log dir + try { + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + } catch (IOException ex) { + diagnosticsMsg.append(ex.getMessage() + "\n"); + } + + // Get Node Files from old app log dir + if (isOlderPathEnabled(conf)) { + remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner, + remoteRootLogDir, suffix); + qualifiedLogDir = FileContext.getFileContext(conf). + makeQualified(remoteAppLogDir); + try { + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + } catch (IOException ex) { + diagnosticsMsg.append(ex.getMessage() + "\n"); + } + } + + // Error reading from or new app log dir does not exist + if (nodeFiles.isEmpty()) { + throw new IOException(diagnosticsMsg.toString()); + } return nodeFiles; } @@ -233,12 +384,11 @@ public class LogAggregationUtils { public static RemoteIterator<FileStatus> getRemoteNodeFileDir( Configuration conf, ApplicationId appId, String appOwner) throws IOException { - Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner); - RemoteIterator<FileStatus> nodeFiles = null; - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - conf).listStatus(remoteAppLogDir); - return nodeFiles; + String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + Path remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + return getRemoteNodeFileDir(conf, appId, appOwner, + remoteRootLogDir, suffix); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 17a8aad..053a563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -31,9 +31,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -361,32 +363,25 @@ public abstract class LogAggregationFileController { // unnecessary load on the filesystem from all of the nodes Path appDir = LogAggregationUtils.getRemoteAppLogDir( remoteRootLogDir, appId, user, remoteRootLogDirSuffix); - - appDir = appDir.makeQualified(remoteFS.getUri(), + Path curDir = appDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + Path rootLogDir = remoteRootLogDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - remoteRootLogDir, user, remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); - } + LinkedList<Path> pathsToCreate = new LinkedList<>(); - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + while (!curDir.equals(rootLogDir)) { + if (!checkExists(remoteFS, curDir, APP_DIR_PERMISSIONS)) { + pathsToCreate.addFirst(curDir); + curDir = curDir.getParent(); + } else { + break; } - - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } + for (Path path : pathsToCreate) { + createDir(remoteFS, path, APP_DIR_PERMISSIONS); + } } catch (IOException e) { LOG.error("Failed to setup application log directory for " + appId, e); @@ -411,7 +406,6 @@ public abstract class LogAggregationFileController { protected void createDir(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { - if (fsSupportsChmod) { FsPermission dirPerm = new FsPermission(fsPerm); fs.mkdirs(path, dirPerm); @@ -467,6 +461,19 @@ public abstract class LogAggregationFileController { this.remoteRootLogDir, this.remoteRootLogDirSuffix); } + /** + * Get the older remote application directory for log aggregation. + * @param appId the Application ID + * @param appOwner the Application Owner + * @return the older remote application directory + * @throws IOException if can not find the remote application directory + */ + public Path getOlderRemoteAppLogDir(ApplicationId appId, String appOwner) + throws IOException { + return LogAggregationUtils.getOlderRemoteAppLogDir(appId, appOwner, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + protected void cleanOldLogs(Path remoteNodeLogFileForApp, final NodeId nodeId, UserGroupInformation userUgi) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java index 8339c1a..c653691 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java @@ -34,13 +34,11 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; /** * Use {@code LogAggregationFileControllerFactory} to get the correct @@ -159,24 +157,39 @@ public class LogAggregationFileControllerFactory { */ public LogAggregationFileController getFileControllerForRead( ApplicationId appId, String appOwner) throws IOException { - StringBuilder diagnosis = new StringBuilder(); - for(LogAggregationFileController fileController : controllers) { + StringBuilder diagnosticsMsg = new StringBuilder(); + + if (LogAggregationUtils.isOlderPathEnabled(conf)) { + for (LogAggregationFileController fileController : controllers) { + try { + Path remoteAppLogDir = fileController.getOlderRemoteAppLogDir(appId, + appOwner); + if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, appId, + appOwner).hasNext()) { + return fileController; + } + } catch (Exception ex) { + diagnosticsMsg.append(ex.getMessage() + "\n"); + continue; + } + } + } + + for (LogAggregationFileController fileController : controllers) { try { Path remoteAppLogDir = fileController.getRemoteAppLogDir( appId, appOwner); - Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified( - remoteAppLogDir); - RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext( - qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); - if (nodeFiles.hasNext()) { + if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, + appId, appOwner).hasNext()) { return fileController; } } catch (Exception ex) { - diagnosis.append(ex.getMessage() + "\n"); + diagnosticsMsg.append(ex.getMessage() + "\n"); continue; } } - throw new IOException(diagnosis.toString()); + + throw new IOException(diagnosticsMsg.toString()); } private boolean validateAggregatedFileControllerName(String name) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 9ab3e37..9bb4f9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -819,6 +819,13 @@ public class LogAggregationIndexedFileController this.remoteRootLogDir, this.remoteRootLogDirSuffix); } + @Override + public Path getOlderRemoteAppLogDir(ApplicationId appId, String user) + throws IOException { + return LogAggregationUtils.getOlderRemoteAppLogDir(appId, user, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + @Private public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end, ApplicationId appId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a00b5d6..c2bcdb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1345,6 +1345,14 @@ </property> <property> + <description>If set to true, the older application log directory + will be considered while fetching application logs. + </description> + <name>yarn.nodemanager.remote-app-log-dir-include-older</name> + <value>true</value> + </property> + + <property> <description>Generate additional logs about container launches. Currently, this creates a copy of the launch script and lists the directory contents of the container work dir. When listing directory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index f36ebf4..f6855d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -60,13 +60,14 @@ public class TestAggregatedLogDeletionService { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - + Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); @@ -80,39 +81,52 @@ public class TestAggregatedLogDeletionService { new FileStatus[]{userDirStatus}); ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); + ApplicationId.newInstance(now, 1); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixDirStatus = new FileStatus(0, true, + 0, 0, toDeleteTime, suffixDir); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, bucketDir); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app1Dir); ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, appId2.toString()); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); + ApplicationId.newInstance(now, 2); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix); + FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app2Dir); ApplicationId appId3 = - ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); + ApplicationId.newInstance(now, 3); + Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId3, "me", suffix); + FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app3Dir); ApplicationId appId4 = - ApplicationId.newInstance(System.currentTimeMillis(), 4); - Path app4Dir = new Path(userLogDir, appId4.toString()); - FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); - - ApplicationId appId5 = - ApplicationId.newInstance(System.currentTimeMillis(), 5); - Path app5Dir = new Path(userLogDir, appId5.toString()); - FileStatus app5DirStatus = - new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir); - - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus, - app4DirStatus, app5DirStatus }); + ApplicationId.newInstance(now, 4); + Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId4, "me", suffix); + FileStatus app4DirStatus = + new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {bucketDirStatus}); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus, + app3DirStatus, app4DirStatus}); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{}); - + + Path app2Log1 = new Path(app2Dir, "host1"); FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1); @@ -137,25 +151,16 @@ public class TestAggregatedLogDeletionService { FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1); Path app4Log2 = new Path(app4Dir, "host2"); - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2); - + FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, + toKeepTime, app4Log2); + when(mockFs.listStatus(app4Dir)).thenReturn( new FileStatus[]{app4Log1Status, app4Log2Status}); - Path app5Log1 = new Path(app5Dir, "host1"); - FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1); - - Path app5Log2 = new Path(app5Dir, "host2"); - FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2); - - when(mockFs.listStatus(app5Dir)).thenReturn( - new FileStatus[]{app5Log1Status, app5Log2Status}); - final List<ApplicationId> finishedApplications = - Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3, - appId4)); + Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3)); final List<ApplicationId> runningApplications = - Collections.unmodifiableList(Arrays.asList(appId5)); + Collections.unmodifiableList(Arrays.asList(appId4)); AggregatedLogDeletionService deletionService = new AggregatedLogDeletionService() { @@ -180,10 +185,9 @@ public class TestAggregatedLogDeletionService { verify(mockFs, timeout(2000)).delete(app1Dir, true); verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true); verify(mockFs, timeout(2000)).delete(app3Dir, true); - verify(mockFs, timeout(2000)).delete(app4Dir, true); - verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true); - verify(mockFs, timeout(2000)).delete(app5Log1, true); - verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true); + verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true); + verify(mockFs, timeout(2000)).delete(app4Log1, true); + verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true); deletionService.stop(); } @@ -198,6 +202,7 @@ public class TestAggregatedLogDeletionService { String root = "mockfs://foo/"; String remoteRootLogDir = root + "tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -220,24 +225,36 @@ public class TestAggregatedLogDeletionService { when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[] { userDirStatus }); - Path userLogDir = new Path(userDir, suffix); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs, + suffixDir); ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, bucketDir); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, app1Dir); ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); //Set time last modified of app1Dir directory and its files to before50Secs - Path app2Dir = new Path(userLogDir, appId2.toString()); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, app2Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus }); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixStatus }); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus }); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, @@ -310,6 +327,7 @@ public class TestAggregatedLogDeletionService { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -334,12 +352,24 @@ public class TestAggregatedLogDeletionService { ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now, + suffixDir); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, now, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {bucketDirStatus}); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus}); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1); @@ -373,10 +403,15 @@ public class TestAggregatedLogDeletionService { verify(mockFs, never()).delete(app1Dir, true); // modify the timestamp of the logs and verify it's picked up quickly + bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir); app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{app1Log1Status}); @@ -392,6 +427,7 @@ public class TestAggregatedLogDeletionService { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); @@ -411,27 +447,36 @@ public class TestAggregatedLogDeletionService { Path remoteRootLogPath = new Path(remoteRootLogDir); Path userDir = new Path(remoteRootLogPath, "me"); + Path suffixDir = new Path(userDir, newSuffix); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir); + FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir); + Path bucketDir = new Path(suffixDir, String.valueOf(0)); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[]{userDirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[]{suffixStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[]{bucketDirStatus}); - Path userLogDir = new Path(userDir, suffix); ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path app1Dir = new Path(bucketDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir); ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, "application_a"); + Path app2Dir = new Path(bucketDir, "application_a"); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir); ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); + Path app3Dir = new Path(bucketDir, appId3.toString()); FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( + when(mockFs.listStatus(bucketDir)).thenReturn( new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus}); + when(mockFs.listStatus(app2Dir)).thenReturn( + new FileStatus[]{}); when(mockFs.listStatus(app1Dir)).thenThrow( new RuntimeException("Should Be Caught and Logged")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index 4767282..231e0e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -87,8 +87,9 @@ public final class TestContainerLogsUtils { createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, content); // upload container logs to remote log dir - Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR), - user + "/logs/" + appId.toString()); + Path path = LogAggregationUtils.getRemoteAppLogDir( + new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)), + appId, user, "logs"); if (fs.exists(path) && deleteRemoteLogDir) { fs.delete(path, true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 796c7e7..adcec8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -692,14 +692,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); } + @Test public void testAppLogDirCreation() throws Exception { - final String logSuffix = "logs"; + final String logSuffix = "bucket_logs"; + final String inputSuffix = "logs"; this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix); InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(this.conf); @@ -733,7 +735,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Path userDir = fs.makeQualified(new Path( remoteRootLogDir.getAbsolutePath(), this.user)); Path suffixDir = new Path(userDir, logSuffix); - Path appDir = new Path(suffixDir, appId.toString()); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, inputSuffix, appId)); + Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId, + this.user, inputSuffix)); LogAggregationContext contextWithAllContainers = Records.newRecord(LogAggregationContext.class); contextWithAllContainers.setLogAggregationPolicyClassName( @@ -742,23 +749,44 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); // start another application and verify only app dir created ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); - Path appDir2 = new Path(suffixDir, appId2.toString()); + Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId2, this.user, inputSuffix)); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); // start another application with the app dir already created and verify // we do not try to create it again - ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); - Path appDir3 = new Path(suffixDir, appId3.toString()); + ApplicationId appId3 = BuilderUtils.newApplicationId(2, 2); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId3, this.user, inputSuffix)); new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); + + + // Verify we do not create bucket dir again + ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003); + Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId4)); + new File(bucketDir4.toUri().getPath()).mkdir(); + Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId4, this.user, inputSuffix)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(appDir4), isA(FsPermission.class)); + aggSvc.stop(); aggSvc.close(); dispatcher.stop(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org