YARN-8273. Log aggregation does not warn if HDFS quota in target directory is 
exceeded (grepas via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b22f56c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b22f56c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b22f56c4

Branch: refs/heads/HDDS-4
Commit: b22f56c4719e63bd4f6edc2a075e0bcdb9442255
Parents: 83f53e5
Author: Robert Kanter <rkan...@apache.org>
Authored: Tue May 22 14:24:38 2018 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Tue May 22 14:24:38 2018 -0700

----------------------------------------------------------------------
 .../hadoop-yarn/hadoop-yarn-common/pom.xml      |  4 ++
 .../logaggregation/AggregatedLogFormat.java     | 14 +++-
 .../LogAggregationDFSException.java             | 45 ++++++++++++
 .../LogAggregationFileController.java           |  4 +-
 .../tfile/LogAggregationTFileController.java    | 13 +++-
 .../logaggregation/TestContainerLogsUtils.java  |  4 +-
 .../logaggregation/AppLogAggregatorImpl.java    | 49 ++++++++++---
 .../TestAppLogAggregatorImpl.java               | 75 +++++++++++++++++---
 .../nodemanager/webapp/TestNMWebServices.java   |  7 +-
 9 files changed, 183 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index db6c11a..a25c524 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -40,6 +40,10 @@
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+    </dependency>
 
 
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index af3066e..81d5053 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.Writable;
@@ -547,7 +548,7 @@ public class AggregatedLogFormat {
     }
 
     @Override
-    public void close() {
+    public void close() throws DSQuotaExceededException {
       try {
         if (writer != null) {
           writer.close();
@@ -555,7 +556,16 @@ public class AggregatedLogFormat {
       } catch (Exception e) {
         LOG.warn("Exception closing writer", e);
       } finally {
-        IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
+        try {
+          this.fsDataOStream.close();
+        } catch (DSQuotaExceededException e) {
+          LOG.error("Exception in closing {}",
+              this.fsDataOStream.getClass(), e);
+          throw e;
+        } catch (Throwable e) {
+          LOG.error("Exception in closing {}",
+              this.fsDataOStream.getClass(), e);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
new file mode 100644
index 0000000..19953e4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This exception class indicates an issue during log aggregation.
+ */
+public class LogAggregationDFSException extends YarnException {
+
+  private static final long serialVersionUID = -6691549081090183145L;
+
+  public LogAggregationDFSException() {
+  }
+
+  public LogAggregationDFSException(String message) {
+    super(message);
+  }
+
+  public LogAggregationDFSException(Throwable cause) {
+    super(cause);
+  }
+
+  public LogAggregationDFSException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5ac89e9..d342e3f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -162,8 +162,10 @@ public abstract class LogAggregationFileController {
 
   /**
    * Close the writer.
+   * @throws LogAggregationDFSException if the closing of the writer fails
+   *         (for example due to HDFS quota being exceeded)
    */
-  public abstract void closeWriter();
+  public abstract void closeWriter() throws LogAggregationDFSException;
 
   /**
    * Write the log content.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index a4f50d2..e87af7f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HarFs;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -46,6 +47,7 @@ import 
org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
@@ -95,10 +97,15 @@ public class LogAggregationTFileController
   }
 
   @Override
-  public void closeWriter() {
+  public void closeWriter() throws LogAggregationDFSException {
     if (this.writer != null) {
-      this.writer.close();
-      this.writer = null;
+      try {
+        this.writer.close();
+      } catch (DSQuotaExceededException e) {
+        throw new LogAggregationDFSException(e);
+      } finally {
+        this.writer = null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index a12e2a1..4767282 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -65,7 +65,7 @@ public final class TestContainerLogsUtils {
   public static void createContainerLogFileInRemoteFS(Configuration conf,
       FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
       String fileName, String user, String content,
-      boolean deleteRemoteLogDir) throws IOException {
+      boolean deleteRemoteLogDir) throws Exception {
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
     //prepare the logs for remote directory
     ApplicationId appId = containerId.getApplicationAttemptId()
@@ -113,7 +113,7 @@ public final class TestContainerLogsUtils {
 
   private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
       Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
-      ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
+      ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
     LogAggregationFileControllerFactory factory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index c7e06ff..5956823 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
@@ -263,7 +264,8 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     return params;
   }
 
-  private void uploadLogsForContainers(boolean appFinished) {
+  private void uploadLogsForContainers(boolean appFinished)
+      throws LogAggregationDFSException {
     if (this.logAggregationDisabled) {
       return;
     }
@@ -301,6 +303,7 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     logAggregationTimes++;
     String diagnosticMessage = "";
     boolean logAggregationSucceedInThisCycle = true;
+    DeletionTask deletionTask = null;
     try {
       try {
         logAggregationFileController.initializeWriter(logControllerContext);
@@ -327,10 +330,9 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
           uploadedLogsInThisCycle = true;
           List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
           
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
-          DeletionTask deletionTask = new FileDeletionTask(delService,
+          deletionTask = new FileDeletionTask(delService,
               this.userUgi.getShortUserName(), null,
               uploadedFilePathsInThisCycleList);
-          delService.delete(deletionTask);
         }
 
         // This container is finished, and all its logs have been uploaded,
@@ -356,9 +358,23 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
         logAggregationSucceedInThisCycle = false;
       }
     } finally {
+      LogAggregationDFSException exc = null;
+      try {
+        this.logAggregationFileController.closeWriter();
+      } catch (LogAggregationDFSException e) {
+        diagnosticMessage = e.getMessage();
+        renameTemporaryLogFileFailed = true;
+        logAggregationSucceedInThisCycle = false;
+        exc = e;
+      }
+      if (logAggregationSucceedInThisCycle && deletionTask != null) {
+        delService.delete(deletionTask);
+      }
       sendLogAggregationReport(logAggregationSucceedInThisCycle,
           diagnosticMessage, appFinished);
-      logAggregationFileController.closeWriter();
+      if (exc != null) {
+        throw exc;
+      }
     }
   }
 
@@ -413,13 +429,18 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
         diagnosticMessage, finalized);
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void run() {
     try {
       doAppLogAggregation();
+    } catch (LogAggregationDFSException e) {
+      // if the log aggregation could not be performed due to DFS issues
+      // let's not clean up the log files, since that can result in
+      // loss of logs
+      LOG.error("Error occurred while aggregating the log for the application "
+          + appId, e);
     } catch (Exception e) {
-      // do post clean up of log directories on any exception
+      // do post clean up of log directories on any other exception
       LOG.error("Error occurred while aggregating the log for the application "
           + appId, e);
       doAppLogAggregationPostCleanUp();
@@ -434,8 +455,7 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void doAppLogAggregation() {
+  private void doAppLogAggregation() throws LogAggregationDFSException {
     while (!this.appFinishing.get() && !this.aborted.get()) {
       synchronized(this) {
         try {
@@ -452,6 +472,9 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
         } catch (InterruptedException e) {
           LOG.warn("PendingContainers queue is interrupted");
           this.appFinishing.set(true);
+        } catch (LogAggregationDFSException e) {
+          this.appFinishing.set(true);
+          throw e;
         }
       }
     }
@@ -460,10 +483,14 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
       return;
     }
 
-    // App is finished, upload the container logs.
-    uploadLogsForContainers(true);
+    try {
+      // App is finished, upload the container logs.
+      uploadLogsForContainers(true);
 
-    doAppLogAggregationPostCleanUp();
+      doAppLogAggregationPostCleanUp();
+    } catch (LogAggregationDFSException e) {
+      LOG.error("Error during log aggregation", e);
+    }
 
     this.dispatcher.getEventHandler().handle(
         new ApplicationEvent(this.appId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index e13c805..95f4c32 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+import 
org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -52,12 +55,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -228,10 +233,15 @@ public class TestAppLogAggregatorImpl {
     config.setLong(
         YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
 
+    LogAggregationTFileController format = spy(
+        new LogAggregationTFileController());
+    format.initialize(config, "TFile");
+
+    Context context = createContext(config);
     final AppLogAggregatorInTest appLogAggregator =
         createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
-            config, recoveredLogInitedTimeMillis,
-            deletionServiceWithExpectedFiles);
+            config, context, recoveredLogInitedTimeMillis,
+            deletionServiceWithExpectedFiles, format);
     appLogAggregator.startContainerLogAggregation(
         new ContainerLogContext(containerId, ContainerType.TASK, 0));
     // set app finished flag first
@@ -269,8 +279,10 @@ public class TestAppLogAggregatorImpl {
 
   private static AppLogAggregatorInTest createAppLogAggregator(
       ApplicationId applicationId, String rootLogDir,
-      YarnConfiguration config, long recoveredLogInitedTimeMillis,
-      DeletionService deletionServiceWithFilesToExpect)
+      YarnConfiguration config, Context context,
+      long recoveredLogInitedTimeMillis,
+      DeletionService deletionServiceWithFilesToExpect,
+      LogAggregationTFileController tFileController)
       throws IOException {
 
     final Dispatcher dispatcher = createNullDispatcher();
@@ -284,16 +296,12 @@ public class TestAppLogAggregatorImpl {
     final LogAggregationContext logAggregationContext = null;
     final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
 
-    final Context context = createContext(config);
     final FileContext fakeLfs = mock(FileContext.class);
     final Path remoteLogDirForApp = new 
Path(REMOTE_LOG_FILE.getAbsolutePath());
-    LogAggregationTFileController format = spy(
-        new LogAggregationTFileController());
-    format.initialize(config, "TFile");
     return new AppLogAggregatorInTest(dispatcher, deletionService,
         config, applicationId, ugi, nodeId, dirsService,
         remoteLogDirForApp, appAcls, logAggregationContext,
-        context, fakeLfs, recoveredLogInitedTimeMillis, format);
+        context, fakeLfs, recoveredLogInitedTimeMillis, tFileController);
   }
 
   /**
@@ -423,4 +431,53 @@ public class TestAppLogAggregatorImpl {
       this.logValue = ArgumentCaptor.forClass(LogValue.class);
     }
   }
+
+  @Test
+  public void testDFSQuotaExceeded() throws Exception {
+
+    // the expectation is that no log files are deleted if the quota has
+    // been exceeded, since that would result in loss of logs
+    DeletionService deletionServiceWithExpectedFiles =
+        createDeletionServiceWithExpectedFile2Delete(Collections.emptySet());
+
+    final YarnConfiguration config = new YarnConfiguration();
+
+    ApplicationId appId = ApplicationId.newInstance(1357543L, 1);
+
+    // we need a LogAggregationTFileController that throws a
+    // LogAggregationDFSException
+    LogAggregationTFileController format =
+        Mockito.mock(LogAggregationTFileController.class);
+    Mockito.doThrow(new LogAggregationDFSException())
+        .when(format).closeWriter();
+
+    NodeManager.NMContext context = (NMContext) createContext(config);
+    context.setNMLogAggregationStatusTracker(
+        Mockito.mock(NMLogAggregationStatusTracker.class));
+
+    final AppLogAggregatorInTest appLogAggregator =
+        createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
+            config, context, 1000L, deletionServiceWithExpectedFiles, format);
+
+    appLogAggregator.startContainerLogAggregation(
+        new ContainerLogContext(
+            ContainerId.newContainerId(
+                ApplicationAttemptId.newInstance(appId, 0), 0),
+            ContainerType.TASK, 0));
+    // set app finished flag first
+    appLogAggregator.finishLogAggregation();
+    appLogAggregator.run();
+
+    // verify that no files have been uploaded
+    ArgumentCaptor<LogValue> logValCaptor =
+        ArgumentCaptor.forClass(LogValue.class);
+    verify(appLogAggregator.getLogAggregationFileController()).write(
+        any(LogKey.class), logValCaptor.capture());
+    Set<String> filesUploaded = new HashSet<>();
+    LogValue logValue = logValCaptor.getValue();
+    for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
+      filesUploaded.add(file.getAbsolutePath());
+    }
+    verifyFilesUploaded(filesUploaded, Collections.emptySet());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 39e403d..dbd980b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -87,7 +87,6 @@ import javax.ws.rs.core.MediaType;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import java.io.File;
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringReader;
 import java.net.HttpURLConnection;
@@ -356,7 +355,7 @@ public class TestNMWebServices extends JerseyTestBase {
   }
 
   @Test (timeout = 5000)
-  public void testContainerLogsWithNewAPI() throws IOException, JSONException{
+  public void testContainerLogsWithNewAPI() throws Exception {
     final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
     WebResource r = resource();
     r = r.path("ws").path("v1").path("node").path("containers")
@@ -365,7 +364,7 @@ public class TestNMWebServices extends JerseyTestBase {
   }
 
   @Test (timeout = 5000)
-  public void testContainerLogsWithOldAPI() throws IOException, JSONException{
+  public void testContainerLogsWithOldAPI() throws Exception {
     final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1);
     WebResource r = resource();
     r = r.path("ws").path("v1").path("node").path("containerlogs")
@@ -538,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase {
   }
 
   private void testContainerLogs(WebResource r, ContainerId containerId)
-      throws IOException {
+      throws Exception {
     final String containerIdStr = containerId.toString();
     final ApplicationAttemptId appAttemptId = containerId
         .getApplicationAttemptId();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to