Repository: flink Updated Branches: refs/heads/master 8e429ced2 -> 84790ef62
[FLINK-1843][jobmanager] remove SoftReferences on archived ExecutionGraphs The previously introduced SoftReferences to store archived ExecutionGraphs cleared old graphs in a non-transparent order. This closes #639. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84790ef6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84790ef6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84790ef6 Branch: refs/heads/master Commit: 84790ef627651c884ebab4ee4269d33d6990a3ca Parents: 8e429ce Author: Maximilian Michels <m...@apache.org> Authored: Wed Apr 29 12:34:31 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Thu Apr 30 10:29:44 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 4 +-- .../runtime/jobmanager/MemoryArchivist.scala | 33 ++++++-------------- .../runtime/messages/ArchiveMessages.scala | 8 +++++ .../testingUtils/TestingMemoryArchivist.scala | 2 +- 4 files changed, 20 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 83f9e35..15a9446 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -292,7 +292,7 @@ class JobManager(val flinkConfiguration: Configuration, if (newJobStatus.isTerminalState) { jobInfo.end = timeStamp - // is the client waiting for the job result? + // is the client waiting for the job result? newJobStatus match { case JobStatus.FINISHED => val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try { @@ -326,7 +326,7 @@ class JobManager(val flinkConfiguration: Configuration, } case None => removeJob(jobID) - } + } case msg: BarrierAck => currentJobs.get(msg.jobID) match { http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 9e71ebb..62ea435 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ import scala.collection.mutable -import scala.ref.SoftReference /** * Actor which stores terminated Flink jobs. The number of stored Flink jobs is set by max_entries. @@ -56,28 +55,31 @@ class MemoryArchivist(private val max_entries: Int) * Map of execution graphs belonging to recently started jobs with the time stamp of the last * received job event. The insert order is preserved through a LinkedHashMap. */ - val graphs = mutable.LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() + val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]() override def receiveWithLogMessages: Receive = { /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => // wrap graph inside a soft reference - graphs.update(jobID, new SoftReference(graph)) - + graphs.update(jobID, graph) trimHistory() + case RequestArchivedJob(jobID: JobID) => + val graph = graphs.get(jobID) + sender ! ArchivedJob(graph) + case RequestArchivedJobs => - sender ! ArchivedJobs(getAllGraphs) + sender ! ArchivedJobs(graphs.values) case RequestJob(jobID) => - getGraph(jobID) match { + graphs.get(jobID) match { case Some(graph) => sender ! JobFound(jobID, graph) case None => sender ! JobNotFound(jobID) } case RequestJobStatus(jobID) => - getGraph(jobID) match { + graphs.get(jobID) match { case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState) case None => sender ! JobNotFound(jobID) } @@ -92,23 +94,6 @@ class MemoryArchivist(private val max_entries: Int) } /** - * Gets all graphs that have not been garbage collected. - * @return An iterable with all valid ExecutionGraphs - */ - protected def getAllGraphs: Iterable[ExecutionGraph] = graphs.values.flatMap(_.get) - - /** - * Gets a graph with a jobID if it has not been garbage collected. - * @param jobID - * @return ExecutionGraph or null - */ - protected def getGraph(jobID: JobID): Option[ExecutionGraph] = graphs.get(jobID) match { - case Some(softRef) => softRef.get - case None => None - } - - - /** * Remove old ExecutionGraphs belonging to a jobID * * if more than max_entries are in the queue. */ http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala index b4ed2cc..e9e7dec 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala @@ -34,6 +34,14 @@ object ArchiveMessages { case object RequestArchivedJobs /** + * Reqeuest a specific ExecutionGraph by JobID. The response is [[RequestArchivedJob]] + * @param jobID + */ + case class RequestArchivedJob(jobID: JobID) + + case class ArchivedJob(job: Option[ExecutionGraph]) + + /** * Response to [[RequestArchivedJobs]] message. The response contains the archived jobs. * @param jobs */ http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala index 88d3cd0..b8d1217 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -34,7 +34,7 @@ trait TestingMemoryArchivist extends ActorLogMessages { def receiveTestingMessages: Receive = { case RequestExecutionGraph(jobID) => - val executionGraph = getGraph(jobID) + val executionGraph = graphs.get(jobID) executionGraph match { case Some(graph) => sender ! ExecutionGraphFound(jobID, graph)