Repository: spark
Updated Branches:
  refs/heads/master 8fdd48959 -> 6e74edeca


[SPARK-2458] Make failed application log visible on History Server

Enabled HistoryServer to show incomplete applications.
We can see the log for incomplete applications by clicking the bottom link.

Author: Masayoshi TSUZUKI <tsudu...@oss.nttdata.co.jp>

Closes #3467 from tsudukim/feature/SPARK-2458-2 and squashes the following 
commits:

76205d2 [Masayoshi TSUZUKI] Fixed and added test code.
29a04a9 [Masayoshi TSUZUKI] Merge branch 'master' of github.com:tsudukim/spark 
into feature/SPARK-2458-2
f9ef854 [Masayoshi TSUZUKI] Added space between "if" and "(". Fixed 
"Incomplete" as capitalized in the web UI. Modified double negative variable 
name.
9b465b0 [Masayoshi TSUZUKI] Modified typo and better implementation.
3ed8a41 [Masayoshi TSUZUKI] Modified too long lines.
08ea14d [Masayoshi TSUZUKI] [SPARK-2458] Make failed application log visible on 
History Server


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e74edec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e74edec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e74edec

Branch: refs/heads/master
Commit: 6e74edeca31acd7dc84a34402e430e017591d858
Parents: 8fdd489
Author: Masayoshi TSUZUKI <tsudu...@oss.nttdata.co.jp>
Authored: Wed Jan 7 07:32:16 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Jan 7 07:32:53 2015 -0800

----------------------------------------------------------------------
 .../history/ApplicationHistoryProvider.scala    |  3 +-
 .../deploy/history/FsHistoryProvider.scala      | 38 +++++++------
 .../spark/deploy/history/HistoryPage.scala      | 60 +++++++++++++-------
 .../deploy/history/FsHistoryProviderSuite.scala | 14 +++--
 4 files changed, 72 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e74edec/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index fbe39b2..553bf3c 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
     startTime: Long,
     endTime: Long,
     lastUpdated: Long,
-    sparkUser: String)
+    sparkUser: String,
+    completed: Boolean = false)
 
 private[spark] abstract class ApplicationHistoryProvider {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e74edec/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 792d15b..2b084a2 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
       val logInfos = statusList
         .filter { entry =>
           try {
-            val isFinishedApplication =
-              if (isLegacyLogDirectory(entry)) {
-                fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
-              } else {
-                
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
-              }
-
-            if (isFinishedApplication) {
-              val modTime = getModificationTime(entry)
-              newLastModifiedTime = math.max(newLastModifiedTime, modTime)
-              modTime >= lastModifiedTime
-            } else {
-              false
-            }
+            val modTime = getModificationTime(entry)
+            newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+            modTime >= lastModifiedTime
           } catch {
             case e: AccessControlException =>
               // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
@@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
               None
           }
         }
-        .sortBy { info => -info.endTime }
+        .sortBy { info => (-info.endTime, -info.startTime) }
 
       lastModifiedTime = newLastModifiedTime
 
@@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
         appListener.startTime.getOrElse(-1L),
         appListener.endTime.getOrElse(-1L),
         getModificationTime(eventLog),
-        appListener.sparkUser.getOrElse(NOT_STARTED))
+        appListener.sparkUser.getOrElse(NOT_STARTED),
+        isApplicationCompleted(eventLog))
     } finally {
       logInput.close()
     }
@@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
   /** Returns the system's mononotically increasing time. */
   private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
 
+  /**
+   * Return true when the application has completed.
+   */
+  private def isApplicationCompleted(entry: FileStatus): Boolean = {
+    if (isLegacyLogDirectory(entry)) {
+      fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
+    } else {
+      !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
+    }
+  }
+
 }
 
 private object FsHistoryProvider {
@@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
     startTime: Long,
     endTime: Long,
     lastUpdated: Long,
-    sparkUser: String)
-  extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, 
sparkUser)
+    sparkUser: String,
+    completed: Boolean = true)
+  extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, 
sparkUser, completed)

http://git-wip-us.apache.org/repos/asf/spark/blob/6e74edec/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 0d5dcfb..e4e7bc2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("") {
   def render(request: HttpServletRequest): Seq[Node] = {
     val requestedPage = 
Option(request.getParameter("page")).getOrElse("1").toInt
     val requestedFirst = (requestedPage - 1) * pageSize
+    val requestedIncomplete =
+      
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
 
-    val allApps = parent.getApplicationList()
+    val allApps = parent.getApplicationList().filter(_.completed != 
requestedIncomplete)
     val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
     val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, 
allApps.size))
 
@@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("") {
 
               <h4>
                 Showing {actualFirst + 1}-{last + 1} of {allApps.size}
-                  <span style="float: right">
-                    {
-                      if (actualPage > 1) {
-                        <a href={"/?page=" + (actualPage - 1)}>&lt; </a>
-                        <a href={"/?page=1"}>1</a>
-                      }
+                {if (requestedIncomplete) "(Incomplete applications)"}
+                <span style="float: right">
+                  {
+                    if (actualPage > 1) {
+                      <a href={makePageLink(actualPage - 1, 
requestedIncomplete)}>&lt; </a>
+                      <a href={makePageLink(1, requestedIncomplete)}>1</a>
                     }
-                    {if (actualPage - plusOrMinus > secondPageFromLeft) " ... 
"}
-                    {leftSideIndices}
-                    {actualPage}
-                    {rightSideIndices}
-                    {if (actualPage + plusOrMinus < secondPageFromRight) " ... 
"}
-                    {
-                      if (actualPage < pageCount) {
-                        <a href={"/?page=" + pageCount}>{pageCount}</a>
-                        <a href={"/?page=" + (actualPage + 1)}> &gt;</a>
-                      }
+                  }
+                  {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
+                  {leftSideIndices}
+                  {actualPage}
+                  {rightSideIndices}
+                  {if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
+                  {
+                    if (actualPage < pageCount) {
+                      <a href={makePageLink(pageCount, 
requestedIncomplete)}>{pageCount}</a>
+                      <a href={makePageLink(actualPage + 1, 
requestedIncomplete)}> &gt;</a>
                     }
-                  </span>
+                  }
+                </span>
               </h4> ++
               appTable
             } else {
@@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("") {
               </p>
             }
           }
+          <a href={makePageLink(actualPage, !requestedIncomplete)}>
+            {
+              if (requestedIncomplete) {
+                "Back to completed applications"
+              } else {
+                "Show incomplete applications"
+              }
+            }
+          </a>
         </div>
       </div>
     UIUtils.basicSparkPage(content, "History Server")
@@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("") {
   private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
     val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
     val startTime = UIUtils.formatDate(info.startTime)
-    val endTime = UIUtils.formatDate(info.endTime)
-    val duration = UIUtils.formatDuration(info.endTime - info.startTime)
+    val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else 
"-"
+    val duration =
+      if (info.endTime > 0) UIUtils.formatDuration(info.endTime - 
info.startTime) else "-"
     val lastUpdated = UIUtils.formatDate(info.lastUpdated)
     <tr>
       <td><a href={uiAddress}>{info.id}</a></td>
@@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("") {
       <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
     </tr>
   }
+
+  private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
+    "/?" + Array(
+      "page=" + linkPage,
+      "showIncomplete=" + showIncomplete
+    ).mkString("&")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e74edec/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index d719e93..8379883 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -64,7 +64,8 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
       )
 
     // Write an unfinished app, new-style.
-    writeFile(new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS), 
true, None,
+    val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+    writeFile(logFile2, true, None,
       SparkListenerApplicationStart("app2-2", None, 1L, "test")
       )
 
@@ -92,12 +93,17 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
 
     val list = provider.getListing().toSeq
     list should not be (null)
-    list.size should be (2)
+    list.size should be (4)
+    list.count(e => e.completed) should be (2)
 
     list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
-      oldLog.lastModified(), "test"))
+      oldLog.lastModified(), "test", true))
     list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 
1L, 2L,
-      logFile1.lastModified(), "test"))
+      logFile1.lastModified(), "test", true))
+    list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, 
-1L,
+      oldLog2.lastModified(), "test", false))
+     list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 
1L, -1L,
+      logFile2.lastModified(), "test", false))
 
     // Make sure the UI can be rendered.
     list.foreach { case info =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to