YARN-6011. Add a new web service to list the files on a container in 
AHSWebService. Contributed by Xuan Gong.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf695577
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf695577
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf695577

Branch: refs/heads/YARN-2915
Commit: cf695577aa6d9715b77ab7309ecd792ef226c439
Parents: d1d0b3e
Author: Junping Du <junping...@apache.org>
Authored: Mon Jan 16 16:20:24 2017 -0800
Committer: Junping Du <junping...@apache.org>
Committed: Mon Jan 16 16:20:24 2017 -0800

 .../webapp/AHSWebServices.java                  | 202 +++++++++++++++++++
 .../webapp/TestAHSWebServices.java              | 112 ++++++++++
 .../server/webapp/dao/ContainerLogsInfo.java    | 101 ++++++++++
 3 files changed, 415 insertions(+)

diff --git 
index ccc5ff9..23e04ef 100644
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import javax.servlet.http.HttpServletRequest;
@@ -40,6 +42,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
+import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -57,12 +60,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -213,6 +218,115 @@ public class AHSWebServices extends WebServices {
+  // TODO: YARN-6080: Create WebServiceUtils to have common functions used in
+  //       RMWebService, NMWebService and AHSWebService.
+  /**
+   * Returns log file's name as well as current file size for a container.
+   *
+   * @param req
+   *    HttpServletRequest
+   * @param res
+   *    HttpServletResponse
+   * @param containerIdStr
+   *    The container ID
+   * @return
+   *    The log file's name and current file size
+   */
+  @GET
+  @Path("/containers/{containerid}/logs")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getContainerLogsInfo(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("containerid") String containerIdStr) {
+    ContainerId containerId = null;
+    init(res);
+    try {
+      containerId = ContainerId.fromString(containerIdStr);
+    } catch (Exception e) {
+      throw new BadRequestException("invalid container id, " + containerIdStr);
+    }
+    ApplicationId appId = containerId.getApplicationAttemptId()
+        .getApplicationId();
+    AppInfo appInfo;
+    try {
+      appInfo = super.getApp(req, res, appId.toString());
+    } catch (Exception ex) {
+      // directly find logs from HDFS.
+      return getContainerLogMeta(appId, null, null, containerIdStr);
+    }
+    String appOwner = appInfo.getUser();
+    ContainerInfo containerInfo;
+    try {
+      containerInfo = super.getContainer(
+          req, res, appId.toString(),
+          containerId.getApplicationAttemptId().toString(),
+          containerId.toString());
+    } catch (Exception ex) {
+      if (isFinishedState(appInfo.getAppState())) {
+        // directly find logs from HDFS.
+        return getContainerLogMeta(appId, appOwner, null, containerIdStr);
+      }
+      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+          "Can not get ContainerInfo for the container: " + containerId);
+    }
+    String nodeId = containerInfo.getNodeId();
+    if (isRunningState(appInfo.getAppState())) {
+      String nodeHttpAddress = containerInfo.getNodeHttpAddress();
+      String uri = "/" + containerId.toString() + "/logs";
+      String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
+      String query = req.getQueryString();
+      if (query != null && !query.isEmpty()) {
+        resURI += "?" + query;
+      }
+      ResponseBuilder response = Response.status(
+          HttpServletResponse.SC_TEMPORARY_REDIRECT);
+      response.header("Location", resURI);
+      return response.build();
+    } else if (isFinishedState(appInfo.getAppState())) {
+      return getContainerLogMeta(appId, appOwner, nodeId,
+              containerIdStr);
+    } else {
+      return createBadResponse(Status.NOT_FOUND,
+          "The application is not at Running or Finished State.");
+    }
+  }
+  /**
+   * Returns the contents of a container's log file in plain text.
+   *
+   * @param req
+   *    HttpServletRequest
+   * @param res
+   *    HttpServletResponse
+   * @param containerIdStr
+   *    The container ID
+   * @param filename
+   *    The name of the log file
+   * @param format
+   *    The content type
+   * @param size
+   *    the size of the log file
+   * @return
+   *    The contents of the container's log file
+   */
+  @GET
+  @Path("/containers/{containerid}/logs/{filename}")
+  @Produces({ MediaType.TEXT_PLAIN })
+  @Public
+  @Unstable
+  public Response getContainerLogFile(@Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("containerid") String containerIdStr,
+      @PathParam("filename") String filename,
+      @QueryParam("format") String format,
+      @QueryParam("size") String size) {
+    return getLogs(req, res, containerIdStr, filename, format, size);
+  }
+  //TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
+  //      container log webservice introduced in AHS to minimize
+  //      the duplication.
   @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
@@ -486,4 +600,92 @@ public class AHSWebServices extends WebServices {
     return Long.parseLong(bytes);
+  private Response getContainerLogMeta(ApplicationId appId, String appOwner,
+      final String nodeId, final String containerIdStr) {
+    Map<String, String> containerLogMeta = new HashMap<>();
+    try {
+      String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+      org.apache.hadoop.fs.Path remoteRootLogDir =
+          new org.apache.hadoop.fs.Path(conf.get(
+              YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+              YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+      org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
+          FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
+      FileContext fc = FileContext.getFileContext(
+          qualifiedRemoteRootLogDir.toUri(), conf);
+      org.apache.hadoop.fs.Path remoteAppDir = null;
+      if (appOwner == null) {
+        org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
+            .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
+        FileStatus[] matching  = fc.util().globStatus(toMatch);
+        if (matching == null || matching.length != 1) {
+          return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+              "Can not get log meta for container: " + containerIdStr);
+        }
+        remoteAppDir = matching[0].getPath();
+      } else {
+        remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
+            remoteRootLogDir, appId, appOwner, suffix);
+      }
+      final RemoteIterator<FileStatus> nodeFiles;
+      nodeFiles = fc.listStatus(remoteAppDir);
+      if (!nodeFiles.hasNext()) {
+        return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+            "Can not get log meta for container: " + containerIdStr);
+      }
+      String nodeIdStr = (nodeId == null) ? null
+          : LogAggregationUtils.getNodeString(nodeId);
+      while (nodeFiles.hasNext()) {
+        FileStatus thisNodeFile = nodeFiles.next();
+        if (nodeIdStr != null) {
+          if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
+            continue;
+          }
+        }
+        if (!thisNodeFile.getPath().getName().endsWith(
+            LogAggregationUtils.TMP_FILE_SUFFIX)) {
+          AggregatedLogFormat.LogReader reader =
+              new AggregatedLogFormat.LogReader(conf,
+                  thisNodeFile.getPath());
+          try {
+            DataInputStream valueStream;
+            LogKey key = new LogKey();
+            valueStream = reader.next(key);
+            while (valueStream != null) {
+              if (key.toString().equals(containerIdStr)) {
+                while (true) {
+                  try {
+                    Pair<String, String> logMeta =
+                        LogReader.readContainerMetaDataAndSkipData(
+                            valueStream, null);
+                    containerLogMeta.put(logMeta.getFirst(),
+                        logMeta.getSecond());
+                  } catch (EOFException eof) {
+                    break;
+                  }
+                }
+                break;
+              }
+              // Next container
+              key = new LogKey();
+              valueStream = reader.next(key);
+            }
+          } finally {
+            reader.close();
+          }
+        }
+      }
+      ResponseBuilder response = Response.ok(new ContainerLogsInfo(
+          containerLogMeta));
+      // Sending the X-Content-Type-Options response header with the value
+      // nosniff will prevent Internet Explorer from MIME-sniffing a response
+      // away from the declared content-type.
+      response.header("X-Content-Type-Options", "nosniff");
+      return response.build();
+    } catch (Exception ex) {
+      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+          ex.getMessage());
+    }
+  }
\ No newline at end of file

diff --git 
index c575e70..6faf642 100644
@@ -66,6 +66,7 @@ import 
 import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -563,6 +564,17 @@ public class TestAHSWebServices extends JerseyTestBase {
     String responseText = response.getEntity(String.class);
     assertTrue(responseText.contains("Hello." + containerId1));
+    // Do the same test with new API
+    r = resource();
+    response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containers")
+        .path(containerId1.toString()).path("logs").path(fileName)
+        .queryParam("user.name", user)
+        .accept(MediaType.TEXT_PLAIN)
+        .get(ClientResponse.class);
+    responseText = response.getEntity(String.class);
+    assertTrue(responseText.contains("Hello." + containerId1));
     // test whether we can find container log from remote diretory if
     // the containerInfo for this container could not be fetched from AHS.
     r = resource();
@@ -575,6 +587,17 @@ public class TestAHSWebServices extends JerseyTestBase {
     responseText = response.getEntity(String.class);
     assertTrue(responseText.contains("Hello." + containerId100));
+    // Do the same test with new API
+    r = resource();
+    response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containers")
+        .path(containerId100.toString()).path("logs").path(fileName)
+        .queryParam("user.name", user)
+        .accept(MediaType.TEXT_PLAIN)
+        .get(ClientResponse.class);
+    responseText = response.getEntity(String.class);
+    assertTrue(responseText.contains("Hello." + containerId100));
     // create an application which can not be found from AHS
     ApplicationId appId100 = ApplicationId.newInstance(0, 100);
     appLogsDir = new Path(rootLogDirPath, appId100.toString());
@@ -723,6 +746,95 @@ public class TestAHSWebServices extends JerseyTestBase {
     assertTrue(redirectURL.contains("/logs/" + fileName));
     assertTrue(redirectURL.contains("user.name=" + user));
+    // Test with new API
+    requestURI = r.path("ws").path("v1")
+        .path("applicationhistory").path("containers")
+        .path(containerId1.toString()).path("logs").path(fileName)
+        .queryParam("user.name", user).getURI();
+    redirectURL = getRedirectURL(requestURI.toString());
+    assertTrue(redirectURL != null);
+    assertTrue(redirectURL.contains("test:1234"));
+    assertTrue(redirectURL.contains("ws/v1/node/containers"));
+    assertTrue(redirectURL.contains(containerId1.toString()));
+    assertTrue(redirectURL.contains("/logs/" + fileName));
+    assertTrue(redirectURL.contains("user.name=" + user));
+  }
+  @Test(timeout = 10000)
+  public void testContainerLogsMetaForRunningApps() throws Exception {
+    String user = "user1";
+    ApplicationId appId = ApplicationId.newInstance(
+        1234, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    WebResource r = resource();
+    URI requestURI = r.path("ws").path("v1")
+        .path("applicationhistory").path("containers")
+        .path(containerId1.toString()).path("logs")
+        .queryParam("user.name", user).getURI();
+    String redirectURL = getRedirectURL(requestURI.toString());
+    assertTrue(redirectURL != null);
+    assertTrue(redirectURL.contains("test:1234"));
+    assertTrue(redirectURL.contains("ws/v1/node/containers"));
+    assertTrue(redirectURL.contains(containerId1.toString()));
+    assertTrue(redirectURL.contains("/logs"));
+  }
+  @Test(timeout = 10000)
+  public void testContainerLogsMetaForFinishedApps() throws Exception {
+    String fileName = "syslog";
+    String user = "user1";
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
+    NodeId nodeId = NodeId.newInstance("test host", 100);
+    //prepare the logs for remote directory
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    // create local logs
+    List<String> rootLogDirList = new ArrayList<String>();
+    rootLogDirList.add(rootLogDir);
+    Path rootLogDirPath = new Path(rootLogDir);
+    if (fs.exists(rootLogDirPath)) {
+      fs.delete(rootLogDirPath, true);
+    }
+    assertTrue(fs.mkdirs(rootLogDirPath));
+    Path appLogsDir = new Path(rootLogDirPath, appId.toString());
+    if (fs.exists(appLogsDir)) {
+      fs.delete(appLogsDir, true);
+    }
+    assertTrue(fs.mkdirs(appLogsDir));
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    String content = "Hello." + containerId1;
+    createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
+        content);
+    // upload container logs to remote log dir
+    Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
+        user + "/logs/" + appId.toString());
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+    assertTrue(fs.mkdirs(path));
+    uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
+        containerId1, path, fs);
+    WebResource r = resource();
+    ClientResponse response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containers")
+        .path(containerId1.toString()).path("logs")
+        .queryParam("user.name", user)
+        .accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    ContainerLogsInfo responseText = response.getEntity(
+        ContainerLogsInfo.class);
+    assertEquals(responseText.getContainerLogsInfo().size(), 1);
+    assertEquals(responseText.getContainerLogsInfo().get(0).getFileName(),
+        fileName);
+    assertEquals(responseText.getContainerLogsInfo().get(0).getFileSize(),
+        String.valueOf(content.length()));
   private static String getRedirectURL(String url) {

diff --git 
new file mode 100644
index 0000000..6e405d4
--- /dev/null
@@ -0,0 +1,101 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.webapp.dao;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+ * {@code ContainerLogsInfo} includes the log meta-data of containers.
+ * <p>
+ * The container log meta-data includes details such as:
+ * <ul>
+ *   <li>The filename of the container log.</li>
+ *   <li>The size of the container log.</li>
+ * </ul>
+ */
+@XmlRootElement(name = "containerLogsInfo")
+public class ContainerLogsInfo {
+  @XmlElement(name = "containerLogInfo")
+  protected List<ContainerLogInfo> containerLogsInfo;
+  //JAXB needs this
+  public ContainerLogsInfo() {}
+  public ContainerLogsInfo(Map<String, String> containerLogMeta)
+      throws YarnException {
+    this.containerLogsInfo = new ArrayList<ContainerLogInfo>();
+    for (Entry<String, String> meta : containerLogMeta.entrySet()) {
+      ContainerLogInfo info = new ContainerLogInfo(meta.getKey(),
+          meta.getValue());
+      containerLogsInfo.add(info);
+    }
+  }
+  public List<ContainerLogInfo> getContainerLogsInfo() {
+    return this.containerLogsInfo;
+  }
+  /**
+   * It includes the log meta-data of a container.
+   *
+   */
+  @Private
+  @VisibleForTesting
+  public static class ContainerLogInfo {
+    private String fileName;
+    private String fileSize;
+    //JAXB needs this
+    public ContainerLogInfo() {}
+    public ContainerLogInfo(String fileName, String fileSize) {
+      this.setFileName(fileName);
+      this.setFileSize(fileSize);
+    }
+    public String getFileName() {
+      return fileName;
+    }
+    public void setFileName(String fileName) {
+      this.fileName = fileName;
+    }
+    public String getFileSize() {
+      return fileSize;
+    }
+    public void setFileSize(String fileSize) {
+      this.fileSize = fileSize;
+    }
+  }

To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to