Repository: flink
Updated Branches:
  refs/heads/master 8e429ced2 -> 84790ef62


[FLINK-1843][jobmanager] remove SoftReferences on archived ExecutionGraphs

The previously introduced SoftReferences to store archived
ExecutionGraphs cleared old graphs in a non-transparent order.

This closes #639.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84790ef6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84790ef6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84790ef6

Branch: refs/heads/master
Commit: 84790ef627651c884ebab4ee4269d33d6990a3ca
Parents: 8e429ce
Author: Maximilian Michels <m...@apache.org>
Authored: Wed Apr 29 12:34:31 2015 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Thu Apr 30 10:29:44 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/jobmanager/MemoryArchivist.scala    | 33 ++++++--------------
 .../runtime/messages/ArchiveMessages.scala      |  8 +++++
 .../testingUtils/TestingMemoryArchivist.scala   |  2 +-
 4 files changed, 20 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 83f9e35..15a9446 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -292,7 +292,7 @@ class JobManager(val flinkConfiguration: Configuration,
           if (newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
 
-          // is the client waiting for the job result?
+            // is the client waiting for the job result?
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults: java.util.Map[String, 
SerializedValue[AnyRef]] = try {
@@ -326,7 +326,7 @@ class JobManager(val flinkConfiguration: Configuration,
           }
         case None =>
           removeJob(jobID)
-          }
+      }
 
     case msg: BarrierAck =>
       currentJobs.get(msg.jobID) match {

http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 9e71ebb..62ea435 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
 import scala.collection.mutable
-import scala.ref.SoftReference
 
 /**
  * Actor which stores terminated Flink jobs. The number of stored Flink jobs 
is set by max_entries.
@@ -56,28 +55,31 @@ class MemoryArchivist(private val max_entries: Int)
    * Map of execution graphs belonging to recently started jobs with the time 
stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  val graphs = mutable.LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
+  val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
 
   override def receiveWithLogMessages: Receive = {
     
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => 
       // wrap graph inside a soft reference
-      graphs.update(jobID, new SoftReference(graph))
-
+      graphs.update(jobID, graph)
       trimHistory()
 
+    case RequestArchivedJob(jobID: JobID) =>
+      val graph = graphs.get(jobID)
+      sender ! ArchivedJob(graph)
+
     case RequestArchivedJobs =>
-      sender ! ArchivedJobs(getAllGraphs)
+      sender ! ArchivedJobs(graphs.values)
 
     case RequestJob(jobID) =>
-      getGraph(jobID) match {
+      graphs.get(jobID) match {
         case Some(graph) => sender ! JobFound(jobID, graph)
         case None => sender ! JobNotFound(jobID)
       }
 
     case RequestJobStatus(jobID) =>
-      getGraph(jobID) match {
+      graphs.get(jobID) match {
         case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
         case None => sender ! JobNotFound(jobID)
       }
@@ -92,23 +94,6 @@ class MemoryArchivist(private val max_entries: Int)
   }
 
   /**
-   * Gets all graphs that have not been garbage collected.
-   * @return An iterable with all valid ExecutionGraphs
-   */
-  protected def getAllGraphs: Iterable[ExecutionGraph] = 
graphs.values.flatMap(_.get)
-
-  /**
-   * Gets a graph with a jobID if it has not been garbage collected.
-   * @param jobID
-   * @return ExecutionGraph or null
-   */
-  protected def getGraph(jobID: JobID): Option[ExecutionGraph] = 
graphs.get(jobID) match {
-    case Some(softRef) => softRef.get
-    case None => None
-  }
-  
-
-  /**
    * Remove old ExecutionGraphs belonging to a jobID
    * * if more than max_entries are in the queue.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index b4ed2cc..e9e7dec 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -34,6 +34,14 @@ object ArchiveMessages {
   case object RequestArchivedJobs
 
   /**
+   * Reqeuest a specific ExecutionGraph by JobID. The response is 
[[RequestArchivedJob]]
+   * @param jobID
+   */
+  case class RequestArchivedJob(jobID: JobID)
+
+  case class ArchivedJob(job: Option[ExecutionGraph])
+
+  /**
    * Response to [[RequestArchivedJobs]] message. The response contains the 
archived jobs.
    * @param jobs
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 88d3cd0..b8d1217 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -34,7 +34,7 @@ trait TestingMemoryArchivist extends ActorLogMessages {
 
   def receiveTestingMessages: Receive = {
     case RequestExecutionGraph(jobID) =>
-      val executionGraph = getGraph(jobID)
+      val executionGraph = graphs.get(jobID)
       
       executionGraph match {
         case Some(graph) => sender ! ExecutionGraphFound(jobID, graph)

Reply via email to