This is an automated email from the ASF dual-hosted git repository.

eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new accb811  YARN-6929.  Improved partition algorithm for yarn 
remote-app-log-dir.             Contributed by Prabhu Joseph
accb811 is described below

commit accb811e5727f2a780a41cd5e50bab47a0cccb68
Author: Eric Yang <ey...@apache.org>
AuthorDate: Tue Apr 30 17:04:59 2019 -0400

    YARN-6929.  Improved partition algorithm for yarn remote-app-log-dir.
                Contributed by Prabhu Joseph
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  11 +-
 .../AggregatedLogDeletionService.java              |  16 +-
 .../yarn/logaggregation/LogAggregationUtils.java   | 184 +++++++++++++++++++--
 .../LogAggregationFileController.java              |  49 +++---
 .../LogAggregationFileControllerFactory.java       |  37 +++--
 .../ifile/LogAggregationIndexedFileController.java |   7 +
 .../src/main/resources/yarn-default.xml            |   8 +
 .../TestAggregatedLogDeletionService.java          | 163 +++++++++++-------
 .../logaggregation/TestContainerLogsUtils.java     |   5 +-
 .../logaggregation/TestLogAggregationService.java  |  40 ++++-
 10 files changed, 398 insertions(+), 122 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b21d763..273f1a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1434,13 +1434,20 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs";
 
   /**
-   * The remote log dir will be created at
-   * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId}
+   * The remote log dir will be created at below location.
+   * NM_REMOTE_APP_LOG_DIR/${user}/bucket_{NM_REMOTE_APP_LOG_DIR_SUFFIX}
+   * /${bucketDir}/${appId}
    */
   public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = 
     NM_PREFIX + "remote-app-log-dir-suffix";
   public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs";
 
+  /** Specifies whether Older Application Log Directory is included. */
+  public static final String NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER =
+      NM_PREFIX + "remote-app-log-dir-include-older";
+  public static final boolean DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER =
+      true;
+
   public static final String YARN_LOG_SERVER_URL =
     YARN_PREFIX + "log.server.url";
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index 90395aa..033339a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -67,7 +67,7 @@ public class AggregatedLogDeletionService extends 
AbstractService {
     public LogDeletionTask(Configuration conf, long retentionSecs, 
ApplicationClientProtocol rmClient) {
       this.conf = conf;
       this.retentionMillis = retentionSecs * 1000;
-      this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+      this.suffix = LogAggregationUtils.getBucketSuffix();
       this.remoteRootLogDir =
         new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -82,8 +82,18 @@ public class AggregatedLogDeletionService extends 
AbstractService {
         FileSystem fs = remoteRootLogDir.getFileSystem(conf);
         for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
           if(userDir.isDirectory()) {
-            Path userDirPath = new Path(userDir.getPath(), suffix);
-            deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
+            for (FileStatus suffixDir : fs.listStatus(userDir.getPath())) {
+              Path suffixDirPath = suffixDir.getPath();
+              if (suffixDir.isDirectory() && suffixDirPath.getName().
+                  startsWith(suffix)) {
+                for (FileStatus bucketDir : fs.listStatus(suffixDirPath)) {
+                  if (bucketDir.isDirectory()) {
+                    deleteOldLogDirsFrom(bucketDir.getPath(), cutoffMillis,
+                        fs, rmClient);
+                  }
+                }
+              }
+            }
           }
         }
       } catch (Throwable t) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index edf2cf3..3f5151b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -38,6 +38,7 @@ import java.util.List;
 public class LogAggregationUtils {
 
   public static final String TMP_FILE_SUFFIX = ".tmp";
+  private static final String BUCKET_SUFFIX = "bucket_";
 
   /**
    * Constructs the full filename for an application's log file per node.
@@ -64,8 +65,22 @@ public class LogAggregationUtils {
    */
   public static Path getRemoteAppLogDir(Path remoteRootLogDir,
       ApplicationId appId, String user, String suffix) {
-    return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
-        appId.toString());
+    return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix,
+        appId), appId.toString());
+  }
+
+  /**
+   * Gets the older remote app log dir.
+   * @param appId the application id
+   * @param user the application owner
+   * @param remoteRootLogDir the aggregated log remote root log dir
+   * @param suffix the log directory suffix
+   * @return the remote application specific log dir.
+   */
+  public static Path getOlderRemoteAppLogDir(ApplicationId appId,
+      String user, Path remoteRootLogDir, String suffix) {
+    return new Path(getOlderRemoteLogSuffixedDir(remoteRootLogDir, user,
+         suffix), appId.toString());
   }
 
   /**
@@ -77,6 +92,19 @@ public class LogAggregationUtils {
    */
   public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir,
       String user, String suffix) {
+    suffix = getBucketSuffix() + suffix;
+    return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
+  }
+
+  /**
+   * Gets the older remote suffixed log dir for the user.
+   * @param remoteRootLogDir the aggregated log remote root log dir
+   * @param user the application owner
+   * @param suffix the log dir suffix
+   * @return the older remote suffixed log dir.
+   */
+  public static Path getOlderRemoteLogSuffixedDir(Path remoteRootLogDir,
+      String user, String suffix) {
     if (suffix == null || suffix.isEmpty()) {
       return getRemoteLogUserDir(remoteRootLogDir, user);
     }
@@ -95,6 +123,33 @@ public class LogAggregationUtils {
   }
 
   /**
+   * Gets the remote log user's bucket dir.
+   * @param remoteRootLogDir the aggregated log remote root log dir
+   * @param user the application owner
+   * @param suffix the log dir suffix
+   * @param appId the application id
+   * @return the remote log per user per cluster timestamp per bucket dir.
+   */
+  public static Path getRemoteBucketDir(Path remoteRootLogDir, String user,
+      String suffix, ApplicationId appId) {
+    int bucket = appId.getId() % 10000;
+    String bucketDir = String.format("%04d", bucket);
+    return new Path(getRemoteLogSuffixedDir(remoteRootLogDir,
+       user, suffix), bucketDir);
+  }
+
+  /**
+   * Check if older Application Log Directory has to be included.
+   * @param conf the configuration
+   * @return Is Older App Log Dir enabled?
+   */
+  public static boolean isOlderPathEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.
+         NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER,
+             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER);
+  }
+
+  /**
    * Returns the suffix component of the log dir.
    * @param conf the configuration
    * @return the suffix which will be appended to the user log dir.
@@ -104,6 +159,14 @@ public class LogAggregationUtils {
         YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
   }
 
+  /**
+   * Returns the bucket suffix component of the log dir.
+   * @return the bucket suffix which appended to user log dir
+   */
+  public static String getBucketSuffix() {
+    return BUCKET_SUFFIX;
+  }
+
   
   /**
    * Converts a nodeId to a form used in the app log file name.
@@ -177,6 +240,24 @@ public class LogAggregationUtils {
   /**
    * Get all available log files under remote app log directory.
    * @param conf the configuration
+   * @param remoteAppLogDir the application log directory
+   * @param appId the applicationId
+   * @param appOwner the application owner
+   * @return the iterator of available log files
+   * @throws IOException if there is no log file directory
+   */
+  public static RemoteIterator<FileStatus> getNodeFiles(Configuration conf,
+      Path remoteAppLogDir, ApplicationId appId, String appOwner)
+      throws IOException {
+    Path qualifiedLogDir =
+        FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+    return FileContext.getFileContext(
+        qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
+  }
+
+  /**
+   * Get all available log files under remote app log directory.
+   * @param conf the configuration
    * @param appId the applicationId
    * @param appOwner the application owner
    * @param remoteRootLogDir the remote root log directory
@@ -188,14 +269,58 @@ public class LogAggregationUtils {
       Configuration conf, ApplicationId appId, String appOwner,
       org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
       throws IOException {
+    RemoteIterator<FileStatus> nodeFilesCur= null;
+    RemoteIterator<FileStatus> nodeFilesPrev = null;
+    StringBuilder diagnosticsMsg = new StringBuilder();
+
+    // Get Node Files from new app log dir
     Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
         remoteRootLogDir, suffix);
-    RemoteIterator<FileStatus> nodeFiles = null;
-    Path qualifiedLogDir =
-        FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
-    nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
-        conf).listStatus(remoteAppLogDir);
-    return nodeFiles;
+    try {
+      nodeFilesCur = getNodeFiles(conf, remoteAppLogDir, appId, appOwner);
+    } catch (IOException ex) {
+      diagnosticsMsg.append(ex.getMessage() + "\n");
+    }
+
+    // Get Node Files from old app log dir
+    if (isOlderPathEnabled(conf)) {
+      remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner,
+              remoteRootLogDir, suffix);
+      try {
+        nodeFilesPrev = getNodeFiles(conf,
+                remoteAppLogDir, appId, appOwner);
+      } catch (IOException ex) {
+        diagnosticsMsg.append(ex.getMessage() + "\n");
+      }
+
+      // Return older files if new app log dir does not exist
+      if (nodeFilesCur == null) {
+        return nodeFilesPrev;
+      } else if (nodeFilesPrev != null) {
+        // Return both new and old node files combined
+        RemoteIterator<FileStatus> curDir = nodeFilesCur;
+        RemoteIterator<FileStatus> prevDir = nodeFilesPrev;
+        RemoteIterator<FileStatus> nodeFilesCombined = new
+            RemoteIterator<FileStatus>() {
+            @Override
+            public boolean hasNext() throws IOException {
+              return prevDir.hasNext() || curDir.hasNext();
+            }
+
+            @Override
+            public FileStatus next() throws IOException {
+              return prevDir.hasNext() ? prevDir.next() : curDir.next();
+            }
+        };
+        return nodeFilesCombined;
+      }
+    }
+
+    // Error reading from or new app log dir does not exist
+    if (nodeFilesCur == null) {
+      throw new IOException(diagnosticsMsg.toString());
+    }
+    return nodeFilesCur;
   }
 
   /**
@@ -212,13 +337,39 @@ public class LogAggregationUtils {
       Configuration conf, ApplicationId appId, String appOwner,
       org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
       throws IOException {
+    StringBuilder diagnosticsMsg = new StringBuilder();
     Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
         remoteRootLogDir, suffix);
     List<FileStatus> nodeFiles = new ArrayList<>();
     Path qualifiedLogDir =
         FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
-    nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
-        qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
+
+    // Get Node Files from new app log dir
+    try {
+      nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
+          qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
+    } catch (IOException ex) {
+      diagnosticsMsg.append(ex.getMessage() + "\n");
+    }
+
+    // Get Node Files from old app log dir
+    if (isOlderPathEnabled(conf)) {
+      remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner,
+          remoteRootLogDir, suffix);
+      qualifiedLogDir = FileContext.getFileContext(conf).
+          makeQualified(remoteAppLogDir);
+      try {
+        nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
+            qualifiedLogDir.toUri(), 
conf).util().listStatus(remoteAppLogDir)));
+      } catch (IOException ex) {
+        diagnosticsMsg.append(ex.getMessage() + "\n");
+      }
+    }
+
+    // Error reading from or new app log dir does not exist
+    if (nodeFiles.isEmpty()) {
+      throw new IOException(diagnosticsMsg.toString());
+    }
     return nodeFiles;
   }
 
@@ -233,12 +384,11 @@ public class LogAggregationUtils {
   public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
       Configuration conf, ApplicationId appId, String appOwner)
       throws IOException {
-    Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner);
-    RemoteIterator<FileStatus> nodeFiles = null;
-    Path qualifiedLogDir =
-        FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
-    nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
-        conf).listStatus(remoteAppLogDir);
-    return nodeFiles;
+    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+    Path remoteRootLogDir = new Path(conf.get(
+        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    return getRemoteNodeFileDir(conf, appId, appOwner,
+        remoteRootLogDir, suffix);
   }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 17a8aad..053a563 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -31,9 +31,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -361,32 +363,25 @@ public abstract class LogAggregationFileController {
             // unnecessary load on the filesystem from all of the nodes
             Path appDir = LogAggregationUtils.getRemoteAppLogDir(
                 remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
-
-            appDir = appDir.makeQualified(remoteFS.getUri(),
+            Path curDir = appDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+            Path rootLogDir = remoteRootLogDir.makeQualified(remoteFS.getUri(),
                 remoteFS.getWorkingDirectory());
 
-            if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
-              Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
-                  remoteRootLogDir, user, remoteRootLogDirSuffix);
-              suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
-                  remoteFS.getWorkingDirectory());
-
-              if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
-                Path userDir = LogAggregationUtils.getRemoteLogUserDir(
-                    remoteRootLogDir, user);
-                userDir = userDir.makeQualified(remoteFS.getUri(),
-                    remoteFS.getWorkingDirectory());
-
-                if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
-                  createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
-                }
+            LinkedList<Path> pathsToCreate = new LinkedList<>();
 
-                createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+            while (!curDir.equals(rootLogDir)) {
+              if (!checkExists(remoteFS, curDir, APP_DIR_PERMISSIONS)) {
+                pathsToCreate.addFirst(curDir);
+                curDir = curDir.getParent();
+              } else {
+                break;
               }
-
-              createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
             }
 
+            for (Path path : pathsToCreate) {
+              createDir(remoteFS, path, APP_DIR_PERMISSIONS);
+            }
           } catch (IOException e) {
             LOG.error("Failed to setup application log directory for "
                 + appId, e);
@@ -411,7 +406,6 @@ public abstract class LogAggregationFileController {
 
   protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
       throws IOException {
-
     if (fsSupportsChmod) {
       FsPermission dirPerm = new FsPermission(fsPerm);
       fs.mkdirs(path, dirPerm);
@@ -467,6 +461,19 @@ public abstract class LogAggregationFileController {
         this.remoteRootLogDir, this.remoteRootLogDirSuffix);
   }
 
+  /**
+   * Get the older remote application directory for log aggregation.
+   * @param appId the Application ID
+   * @param appOwner the Application Owner
+   * @return the older remote application directory
+   * @throws IOException if can not find the remote application directory
+   */
+  public Path getOlderRemoteAppLogDir(ApplicationId appId, String appOwner)
+      throws IOException {
+    return LogAggregationUtils.getOlderRemoteAppLogDir(appId, appOwner,
+        this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+  }
+
   protected void cleanOldLogs(Path remoteNodeLogFileForApp,
       final NodeId nodeId, UserGroupInformation userUgi) {
     try {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
index 8339c1a..c653691 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
@@ -34,13 +34,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 
 /**
  * Use {@code LogAggregationFileControllerFactory} to get the correct
@@ -159,24 +157,39 @@ public class LogAggregationFileControllerFactory {
    */
   public LogAggregationFileController getFileControllerForRead(
       ApplicationId appId, String appOwner) throws IOException {
-    StringBuilder diagnosis = new StringBuilder();
-    for(LogAggregationFileController fileController : controllers) {
+    StringBuilder diagnosticsMsg = new StringBuilder();
+
+    if (LogAggregationUtils.isOlderPathEnabled(conf)) {
+      for (LogAggregationFileController fileController : controllers) {
+        try {
+          Path remoteAppLogDir = fileController.getOlderRemoteAppLogDir(appId,
+              appOwner);
+          if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, appId,
+              appOwner).hasNext()) {
+            return fileController;
+          }
+        } catch (Exception ex) {
+          diagnosticsMsg.append(ex.getMessage() + "\n");
+          continue;
+        }
+      }
+    }
+
+    for (LogAggregationFileController fileController : controllers) {
       try {
         Path remoteAppLogDir = fileController.getRemoteAppLogDir(
             appId, appOwner);
-        Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
-            remoteAppLogDir);
-        RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
-            qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
-        if (nodeFiles.hasNext()) {
+        if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir,
+            appId, appOwner).hasNext()) {
           return fileController;
         }
       } catch (Exception ex) {
-        diagnosis.append(ex.getMessage() + "\n");
+        diagnosticsMsg.append(ex.getMessage() + "\n");
         continue;
       }
     }
-    throw new IOException(diagnosis.toString());
+
+    throw new IOException(diagnosticsMsg.toString());
   }
 
   private boolean validateAggregatedFileControllerName(String name) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 9ab3e37..9bb4f9d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -819,6 +819,13 @@ public class LogAggregationIndexedFileController
         this.remoteRootLogDir, this.remoteRootLogDirSuffix);
   }
 
+  @Override
+  public Path getOlderRemoteAppLogDir(ApplicationId appId, String user)
+      throws IOException {
+    return LogAggregationUtils.getOlderRemoteAppLogDir(appId, user,
+        this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+  }
+
   @Private
   public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end,
       ApplicationId appId) throws IOException {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a00b5d6..c2bcdb6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1345,6 +1345,14 @@
   </property>
 
   <property>
+    <description>If set to true, the older application log directory
+    will be considered while fetching application logs.
+    </description>
+    <name>yarn.nodemanager.remote-app-log-dir-include-older</name>
+    <value>true</value>
+  </property>
+
+  <property>
     <description>Generate additional logs about container launches.
     Currently, this creates a copy of the launch script and lists the
     directory contents of the container work dir. When listing directory
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index f36ebf4..f6855d6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -60,13 +60,14 @@ public class TestAggregatedLogDeletionService {
     String root = "mockfs://foo/";
     String remoteRootLogDir = root+"tmp/logs";
     String suffix = "logs";
+    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
     final Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
     conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
     conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    
+
     Path rootPath = new Path(root);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
@@ -80,39 +81,52 @@ public class TestAggregatedLogDeletionService {
         new FileStatus[]{userDirStatus});
 
     ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path userLogDir = new Path(userDir, suffix);
-    Path app1Dir = new Path(userLogDir, appId1.toString());
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app1Dir);
+        ApplicationId.newInstance(now, 1);
+    Path suffixDir = new Path(userDir, newSuffix);
+    FileStatus suffixDirStatus = new FileStatus(0, true,
+        0, 0, toDeleteTime, suffixDir);
+    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+        remoteRootLogPath, "me", suffix, appId1);
+    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
+        0, toDeleteTime, bucketDir);
+    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId1, "me", suffix);
+    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
+        toDeleteTime, app1Dir);
     
     ApplicationId appId2 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    Path app2Dir = new Path(userLogDir, appId2.toString());
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app2Dir);
+        ApplicationId.newInstance(now, 2);
+    Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId2, "me", suffix);
+    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
+        toDeleteTime, app2Dir);
     
     ApplicationId appId3 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 3);
-    Path app3Dir = new Path(userLogDir, appId3.toString());
-    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app3Dir);
+        ApplicationId.newInstance(now, 3);
+    Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId3, "me", suffix);
+    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
+        toDeleteTime, app3Dir);
     
     ApplicationId appId4 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 4);
-    Path app4Dir = new Path(userLogDir, appId4.toString());
-    FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app4Dir);
-    
-    ApplicationId appId5 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 5);
-    Path app5Dir = new Path(userLogDir, appId5.toString());
-    FileStatus app5DirStatus =
-        new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
-
-    when(mockFs.listStatus(userLogDir)).thenReturn(
-      new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
-          app4DirStatus, app5DirStatus });
+        ApplicationId.newInstance(now, 4);
+    Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId4, "me", suffix);
+    FileStatus app4DirStatus =
+        new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
+
+    when(mockFs.listStatus(userDir)).thenReturn(
+        new FileStatus[] {suffixDirStatus});
+    when(mockFs.listStatus(suffixDir)).thenReturn(
+        new FileStatus[] {bucketDirStatus});
+    when(mockFs.listStatus(bucketDir)).thenReturn(
+        new FileStatus[] {app1DirStatus, app2DirStatus,
+            app3DirStatus, app4DirStatus});
     
     when(mockFs.listStatus(app1Dir)).thenReturn(
         new FileStatus[]{});
-    
+
+
     Path app2Log1 = new Path(app2Dir, "host1");
     FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, 
app2Log1);
     
@@ -137,25 +151,16 @@ public class TestAggregatedLogDeletionService {
     FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, 
app4Log1);
     
     Path app4Log2 = new Path(app4Dir, "host2");
-    FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, 
app4Log2);
-    
+    FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
+        toKeepTime, app4Log2);
+
     when(mockFs.listStatus(app4Dir)).thenReturn(
         new FileStatus[]{app4Log1Status, app4Log2Status});
 
-    Path app5Log1 = new Path(app5Dir, "host1");
-    FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, 
app5Log1);
-    
-    Path app5Log2 = new Path(app5Dir, "host2");
-    FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, 
app5Log2);
-
-    when(mockFs.listStatus(app5Dir)).thenReturn(
-        new FileStatus[]{app5Log1Status, app5Log2Status});
-
     final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
-          appId4));
+        Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
     final List<ApplicationId> runningApplications =
-        Collections.unmodifiableList(Arrays.asList(appId5));
+        Collections.unmodifiableList(Arrays.asList(appId4));
 
     AggregatedLogDeletionService deletionService =
         new AggregatedLogDeletionService() {
@@ -180,10 +185,9 @@ public class TestAggregatedLogDeletionService {
     verify(mockFs, timeout(2000)).delete(app1Dir, true);
     verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
     verify(mockFs, timeout(2000)).delete(app3Dir, true);
-    verify(mockFs, timeout(2000)).delete(app4Dir, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
-    verify(mockFs, timeout(2000)).delete(app5Log1, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
+    verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
+    verify(mockFs, timeout(2000)).delete(app4Log1, true);
+    verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
 
     deletionService.stop();
   }
@@ -198,6 +202,7 @@ public class TestAggregatedLogDeletionService {
     String root = "mockfs://foo/";
     String remoteRootLogDir = root + "tmp/logs";
     String suffix = "logs";
+    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
     final Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
     conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
@@ -220,24 +225,36 @@ public class TestAggregatedLogDeletionService {
     when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
         new FileStatus[] { userDirStatus });
 
-    Path userLogDir = new Path(userDir, suffix);
+    Path suffixDir = new Path(userDir, newSuffix);
+    FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
+        suffixDir);
 
     ApplicationId appId1 =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     //Set time last modified of app1Dir directory and its files to 
before2000Secs 
-    Path app1Dir = new Path(userLogDir, appId1.toString());
+    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId1, "me", suffix);
+    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+        remoteRootLogPath, "me", suffix, appId1);
+    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
+        0, before50Secs, bucketDir);
     FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
         app1Dir);
     
     ApplicationId appId2 =
         ApplicationId.newInstance(System.currentTimeMillis(), 2);
     //Set time last modified of app1Dir directory and its files to 
before50Secs 
-    Path app2Dir = new Path(userLogDir, appId2.toString());
+    Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId2, "me", suffix);
     FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
         app2Dir);
 
-    when(mockFs.listStatus(userLogDir)).thenReturn(
-        new FileStatus[] { app1DirStatus, app2DirStatus });
+    when(mockFs.listStatus(userDir)).thenReturn(
+        new FileStatus[] {suffixStatus });
+    when(mockFs.listStatus(suffixDir)).thenReturn(
+        new FileStatus[] {bucketDirStatus });
+    when(mockFs.listStatus(bucketDir)).thenReturn(
+        new FileStatus[] {app1DirStatus, app2DirStatus });
 
     Path app1Log1 = new Path(app1Dir, "host1");
     FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
@@ -310,6 +327,7 @@ public class TestAggregatedLogDeletionService {
     String root = "mockfs://foo/";
     String remoteRootLogDir = root+"tmp/logs";
     String suffix = "logs";
+    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
     conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
@@ -334,12 +352,24 @@ public class TestAggregatedLogDeletionService {
 
     ApplicationId appId1 =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path userLogDir = new Path(userDir, suffix);
-    Path app1Dir = new Path(userLogDir, appId1.toString());
+    Path suffixDir = new Path(userDir, newSuffix);
+    FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
+        suffixDir);
+    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+        remoteRootLogPath, "me", suffix, appId1);
+    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogPath, appId1, "me", suffix);
+    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
+        0, now, bucketDir);
+
     FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
 
-    when(mockFs.listStatus(userLogDir)).thenReturn(
-        new FileStatus[]{app1DirStatus});
+    when(mockFs.listStatus(userDir)).thenReturn(
+        new FileStatus[] {suffixDirStatus});
+    when(mockFs.listStatus(suffixDir)).thenReturn(
+        new FileStatus[] {bucketDirStatus});
+    when(mockFs.listStatus(bucketDir)).thenReturn(
+        new FileStatus[] {app1DirStatus});
 
     Path app1Log1 = new Path(app1Dir, "host1");
     FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
@@ -373,10 +403,15 @@ public class TestAggregatedLogDeletionService {
     verify(mockFs, never()).delete(app1Dir, true);
 
     // modify the timestamp of the logs and verify it's picked up quickly
+    bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
     app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
     app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
-    when(mockFs.listStatus(userLogDir)).thenReturn(
-        new FileStatus[]{app1DirStatus});
+    when(mockFs.listStatus(userDir)).thenReturn(
+        new FileStatus[] {suffixDirStatus});
+    when(mockFs.listStatus(suffixDir)).thenReturn(
+        new FileStatus[] {bucketDirStatus });
+    when(mockFs.listStatus(bucketDir)).thenReturn(
+        new FileStatus[] {app1DirStatus });
     when(mockFs.listStatus(app1Dir)).thenReturn(
         new FileStatus[]{app1Log1Status});
 
@@ -392,6 +427,7 @@ public class TestAggregatedLogDeletionService {
     String root = "mockfs://foo/";
     String remoteRootLogDir = root+"tmp/logs";
     String suffix = "logs";
+    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class,
         FileSystem.class);
@@ -411,27 +447,36 @@ public class TestAggregatedLogDeletionService {
     Path remoteRootLogPath = new Path(remoteRootLogDir);
 
     Path userDir = new Path(remoteRootLogPath, "me");
+    Path suffixDir = new Path(userDir, newSuffix);
     FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
+    FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
+    Path bucketDir = new Path(suffixDir, String.valueOf(0));
+    FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
 
     when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
         new FileStatus[]{userDirStatus});
+    when(mockFs.listStatus(userDir)).thenReturn(
+        new FileStatus[]{suffixStatus});
+    when(mockFs.listStatus(suffixDir)).thenReturn(
+        new FileStatus[]{bucketDirStatus});
 
-    Path userLogDir = new Path(userDir, suffix);
     ApplicationId appId1 =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path app1Dir = new Path(userLogDir, appId1.toString());
+    Path app1Dir = new Path(bucketDir, appId1.toString());
     FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
     ApplicationId appId2 =
         ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    Path app2Dir = new Path(userLogDir, "application_a");
+    Path app2Dir = new Path(bucketDir, "application_a");
     FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
     ApplicationId appId3 =
         ApplicationId.newInstance(System.currentTimeMillis(), 3);
-    Path app3Dir = new Path(userLogDir, appId3.toString());
+    Path app3Dir = new Path(bucketDir, appId3.toString());
     FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
 
-    when(mockFs.listStatus(userLogDir)).thenReturn(
+    when(mockFs.listStatus(bucketDir)).thenReturn(
         new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
+    when(mockFs.listStatus(app2Dir)).thenReturn(
+            new FileStatus[]{});
 
     when(mockFs.listStatus(app1Dir)).thenThrow(
         new RuntimeException("Should Be Caught and Logged"));
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index 4767282..231e0e2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -87,8 +87,9 @@ public final class TestContainerLogsUtils {
     createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName,
         content);
     // upload container logs to remote log dir
-    Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR),
-        user + "/logs/" + appId.toString());
+    Path path = LogAggregationUtils.getRemoteAppLogDir(
+        new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)),
+            appId, user, "logs");
     if (fs.exists(path) && deleteRemoteLogDir) {
       fs.delete(path, true);
     }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 796c7e7..adcec8d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -692,14 +692,16 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     logAggregationService.stop();
   }
 
+
   @Test
   public void testAppLogDirCreation() throws Exception {
-    final String logSuffix = "logs";
+    final String logSuffix = "bucket_logs";
+    final String inputSuffix = "logs";
     this.conf.set(YarnConfiguration.NM_LOG_DIRS,
         localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix);
 
     InlineDispatcher dispatcher = new InlineDispatcher();
     dispatcher.init(this.conf);
@@ -733,7 +735,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     Path userDir = fs.makeQualified(new Path(
         remoteRootLogDir.getAbsolutePath(), this.user));
     Path suffixDir = new Path(userDir, logSuffix);
-    Path appDir = new Path(suffixDir, appId.toString());
+    Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+        new Path(remoteRootLogDir.getAbsolutePath()),
+            this.user, inputSuffix, appId));
+    Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+        new Path(remoteRootLogDir.getAbsolutePath()), appId,
+            this.user, inputSuffix));
     LogAggregationContext contextWithAllContainers =
         Records.newRecord(LogAggregationContext.class);
     contextWithAllContainers.setLogAggregationPolicyClassName(
@@ -742,23 +749,44 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         this.acls, contextWithAllContainers));
     verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
     verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
+    verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class));
     verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
 
     // start another application and verify only app dir created
     ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
-    Path appDir2 = new Path(suffixDir, appId2.toString());
+    Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+        new Path(remoteRootLogDir.getAbsolutePath()),
+            appId2, this.user, inputSuffix));
     aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
         this.acls, contextWithAllContainers));
     verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
 
     // start another application with the app dir already created and verify
     // we do not try to create it again
-    ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
-    Path appDir3 = new Path(suffixDir, appId3.toString());
+    ApplicationId appId3 = BuilderUtils.newApplicationId(2, 2);
+    Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+        new Path(remoteRootLogDir.getAbsolutePath()),
+            appId3, this.user, inputSuffix));
     new File(appDir3.toUri().getPath()).mkdir();
     aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
         this.acls, contextWithAllContainers));
     verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
+
+
+    // Verify we do not create bucket dir again
+    ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003);
+    Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+        new Path(remoteRootLogDir.getAbsolutePath()),
+        this.user, logSuffix, appId4));
+    new File(bucketDir4.toUri().getPath()).mkdir();
+    Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+            new Path(remoteRootLogDir.getAbsolutePath()),
+            appId4, this.user, inputSuffix));
+    aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null,
+        this.acls, contextWithAllContainers));
+    verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class));
+    verify(spyFs).mkdirs(eq(appDir4), isA(FsPermission.class));
+
     aggSvc.stop();
     aggSvc.close();
     dispatcher.stop();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to