YARN-8273. Log aggregation does not warn if HDFS quota in target directory is exceeded (grepas via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b22f56c4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b22f56c4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b22f56c4 Branch: refs/heads/HDDS-4 Commit: b22f56c4719e63bd4f6edc2a075e0bcdb9442255 Parents: 83f53e5 Author: Robert Kanter <rkan...@apache.org> Authored: Tue May 22 14:24:38 2018 -0700 Committer: Robert Kanter <rkan...@apache.org> Committed: Tue May 22 14:24:38 2018 -0700 ---------------------------------------------------------------------- .../hadoop-yarn/hadoop-yarn-common/pom.xml | 4 ++ .../logaggregation/AggregatedLogFormat.java | 14 +++- .../LogAggregationDFSException.java | 45 ++++++++++++ .../LogAggregationFileController.java | 4 +- .../tfile/LogAggregationTFileController.java | 13 +++- .../logaggregation/TestContainerLogsUtils.java | 4 +- .../logaggregation/AppLogAggregatorImpl.java | 49 ++++++++++--- .../TestAppLogAggregatorImpl.java | 75 +++++++++++++++++--- .../nodemanager/webapp/TestNMWebServices.java | 7 +- 9 files changed, 183 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index db6c11a..a25c524 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -40,6 +40,10 @@ <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index af3066e..81d5053 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.Writable; @@ -547,7 +548,7 @@ public class AggregatedLogFormat { } @Override - public void close() { + public void close() throws DSQuotaExceededException { try { if (writer != null) { writer.close(); @@ -555,7 +556,16 @@ public class AggregatedLogFormat { } catch (Exception e) { LOG.warn("Exception closing writer", e); } finally { - IOUtils.cleanupWithLogger(LOG, this.fsDataOStream); + try { + this.fsDataOStream.close(); + } catch (DSQuotaExceededException e) { + LOG.error("Exception in closing {}", + this.fsDataOStream.getClass(), e); + throw e; + } catch (Throwable e) { + LOG.error("Exception in closing {}", + this.fsDataOStream.getClass(), e); + } } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java new file mode 100644 index 0000000..19953e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java @@ -0,0 +1,45 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This exception class indicates an issue during log aggregation. + */ +public class LogAggregationDFSException extends YarnException { + + private static final long serialVersionUID = -6691549081090183145L; + + public LogAggregationDFSException() { + } + + public LogAggregationDFSException(String message) { + super(message); + } + + public LogAggregationDFSException(Throwable cause) { + super(cause); + } + + public LogAggregationDFSException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 5ac89e9..d342e3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -162,8 +162,10 @@ public abstract class LogAggregationFileController { /** * Close the writer. + * @throws LogAggregationDFSException if the closing of the writer fails + * (for example due to HDFS quota being exceeded) */ - public abstract void closeWriter(); + public abstract void closeWriter() throws LogAggregationDFSException; /** * Write the log content. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index a4f50d2..e87af7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; @@ -95,10 +97,15 @@ public class LogAggregationTFileController } @Override - public void closeWriter() { + public void closeWriter() throws LogAggregationDFSException { if (this.writer != null) { - this.writer.close(); - this.writer = null; + try { + this.writer.close(); + } catch (DSQuotaExceededException e) { + throw new LogAggregationDFSException(e); + } finally { + this.writer = null; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index a12e2a1..4767282 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -65,7 +65,7 @@ public final class TestContainerLogsUtils { public static void createContainerLogFileInRemoteFS(Configuration conf, FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId, String fileName, String user, String content, - boolean deleteRemoteLogDir) throws IOException { + boolean deleteRemoteLogDir) throws Exception { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); //prepare the logs for remote directory ApplicationId appId = containerId.getApplicationAttemptId() @@ -113,7 +113,7 @@ public final class TestContainerLogsUtils { private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, Configuration configuration, List<String> rootLogDirs, NodeId nodeId, - ContainerId containerId, Path appDir, FileSystem fs) throws IOException { + ContainerId containerId, Path appDir, FileSystem fs) throws Exception { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); LogAggregationFileControllerFactory factory http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index c7e06ff..5956823 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; @@ -263,7 +264,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return params; } - private void uploadLogsForContainers(boolean appFinished) { + private void uploadLogsForContainers(boolean appFinished) + throws LogAggregationDFSException { if (this.logAggregationDisabled) { return; } @@ -301,6 +303,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; + DeletionTask deletionTask = null; try { try { logAggregationFileController.initializeWriter(logControllerContext); @@ -327,10 +330,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator { uploadedLogsInThisCycle = true; List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>(); uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); - DeletionTask deletionTask = new FileDeletionTask(delService, + deletionTask = new FileDeletionTask(delService, this.userUgi.getShortUserName(), null, uploadedFilePathsInThisCycleList); - delService.delete(deletionTask); } // This container is finished, and all its logs have been uploaded, @@ -356,9 +358,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator { logAggregationSucceedInThisCycle = false; } } finally { + LogAggregationDFSException exc = null; + try { + this.logAggregationFileController.closeWriter(); + } catch (LogAggregationDFSException e) { + diagnosticMessage = e.getMessage(); + renameTemporaryLogFileFailed = true; + logAggregationSucceedInThisCycle = false; + exc = e; + } + if (logAggregationSucceedInThisCycle && deletionTask != null) { + delService.delete(deletionTask); + } sendLogAggregationReport(logAggregationSucceedInThisCycle, diagnosticMessage, appFinished); - logAggregationFileController.closeWriter(); + if (exc != null) { + throw exc; + } } } @@ -413,13 +429,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator { diagnosticMessage, finalized); } - @SuppressWarnings("unchecked") @Override public void run() { try { doAppLogAggregation(); + } catch (LogAggregationDFSException e) { + // if the log aggregation could not be performed due to DFS issues + // let's not clean up the log files, since that can result in + // loss of logs + LOG.error("Error occurred while aggregating the log for the application " + + appId, e); } catch (Exception e) { - // do post clean up of log directories on any exception + // do post clean up of log directories on any other exception LOG.error("Error occurred while aggregating the log for the application " + appId, e); doAppLogAggregationPostCleanUp(); @@ -434,8 +455,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } - @SuppressWarnings("unchecked") - private void doAppLogAggregation() { + private void doAppLogAggregation() throws LogAggregationDFSException { while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { @@ -452,6 +472,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); this.appFinishing.set(true); + } catch (LogAggregationDFSException e) { + this.appFinishing.set(true); + throw e; } } } @@ -460,10 +483,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return; } - // App is finished, upload the container logs. - uploadLogsForContainers(true); + try { + // App is finished, upload the container logs. + uploadLogsForContainers(true); - doAppLogAggregationPostCleanUp(); + doAppLogAggregationPostCleanUp(); + } catch (LogAggregationDFSException e) { + LOG.error("Error during log aggregation", e); + } this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index e13c805..95f4c32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -52,12 +55,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -228,10 +233,15 @@ public class TestAppLogAggregatorImpl { config.setLong( YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs); + LogAggregationTFileController format = spy( + new LogAggregationTFileController()); + format.initialize(config, "TFile"); + + Context context = createContext(config); final AppLogAggregatorInTest appLogAggregator = createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), - config, recoveredLogInitedTimeMillis, - deletionServiceWithExpectedFiles); + config, context, recoveredLogInitedTimeMillis, + deletionServiceWithExpectedFiles, format); appLogAggregator.startContainerLogAggregation( new ContainerLogContext(containerId, ContainerType.TASK, 0)); // set app finished flag first @@ -269,8 +279,10 @@ public class TestAppLogAggregatorImpl { private static AppLogAggregatorInTest createAppLogAggregator( ApplicationId applicationId, String rootLogDir, - YarnConfiguration config, long recoveredLogInitedTimeMillis, - DeletionService deletionServiceWithFilesToExpect) + YarnConfiguration config, Context context, + long recoveredLogInitedTimeMillis, + DeletionService deletionServiceWithFilesToExpect, + LogAggregationTFileController tFileController) throws IOException { final Dispatcher dispatcher = createNullDispatcher(); @@ -284,16 +296,12 @@ public class TestAppLogAggregatorImpl { final LogAggregationContext logAggregationContext = null; final Map<ApplicationAccessType, String> appAcls = new HashMap<>(); - final Context context = createContext(config); final FileContext fakeLfs = mock(FileContext.class); final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); - LogAggregationTFileController format = spy( - new LogAggregationTFileController()); - format.initialize(config, "TFile"); return new AppLogAggregatorInTest(dispatcher, deletionService, config, applicationId, ugi, nodeId, dirsService, remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis, format); + context, fakeLfs, recoveredLogInitedTimeMillis, tFileController); } /** @@ -423,4 +431,53 @@ public class TestAppLogAggregatorImpl { this.logValue = ArgumentCaptor.forClass(LogValue.class); } } + + @Test + public void testDFSQuotaExceeded() throws Exception { + + // the expectation is that no log files are deleted if the quota has + // been exceeded, since that would result in loss of logs + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + final YarnConfiguration config = new YarnConfiguration(); + + ApplicationId appId = ApplicationId.newInstance(1357543L, 1); + + // we need a LogAggregationTFileController that throws a + // LogAggregationDFSException + LogAggregationTFileController format = + Mockito.mock(LogAggregationTFileController.class); + Mockito.doThrow(new LogAggregationDFSException()) + .when(format).closeWriter(); + + NodeManager.NMContext context = (NMContext) createContext(config); + context.setNMLogAggregationStatusTracker( + Mockito.mock(NMLogAggregationStatusTracker.class)); + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), + config, context, 1000L, deletionServiceWithExpectedFiles, format); + + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 0), 0), + ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify that no files have been uploaded + ArgumentCaptor<LogValue> logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.getLogAggregationFileController()).write( + any(LogKey.class), logValCaptor.capture()); + Set<String> filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded, Collections.emptySet()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 39e403d..dbd980b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -87,7 +87,6 @@ import javax.ws.rs.core.MediaType; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import java.io.File; -import java.io.IOException; import java.io.PrintWriter; import java.io.StringReader; import java.net.HttpURLConnection; @@ -356,7 +355,7 @@ public class TestNMWebServices extends JerseyTestBase { } @Test (timeout = 5000) - public void testContainerLogsWithNewAPI() throws IOException, JSONException{ + public void testContainerLogsWithNewAPI() throws Exception { final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0); WebResource r = resource(); r = r.path("ws").path("v1").path("node").path("containers") @@ -365,7 +364,7 @@ public class TestNMWebServices extends JerseyTestBase { } @Test (timeout = 5000) - public void testContainerLogsWithOldAPI() throws IOException, JSONException{ + public void testContainerLogsWithOldAPI() throws Exception { final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1); WebResource r = resource(); r = r.path("ws").path("v1").path("node").path("containerlogs") @@ -538,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase { } private void testContainerLogs(WebResource r, ContainerId containerId) - throws IOException { + throws Exception { final String containerIdStr = containerId.toString(); final ApplicationAttemptId appAttemptId = containerId .getApplicationAttemptId(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org