Repository: spark Updated Branches: refs/heads/master 569e50680 -> a81e336f1
[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs ## What changes were proposed in this pull request? When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata). It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation. ## How was this patch tested? existing ut cc zsxwing Author: uncleGen <husty...@gmail.com> Closes #16601 from uncleGen/SPARK-19182. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a81e336f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a81e336f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a81e336f Branch: refs/heads/master Commit: a81e336f1eddc2c6245d807aae2c81ddc60eabf9 Parents: 569e506 Author: uncleGen <husty...@gmail.com> Authored: Wed Jan 18 10:55:31 2017 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jan 18 10:55:31 2017 -0800 ---------------------------------------------------------------------- .../org/apache/spark/streaming/DStreamGraph.scala | 13 +++++++++---- .../streaming/ui/StreamingJobProgressListener.scala | 8 ++++---- 2 files changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 54d736e..dce2028 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() + @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil + var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null + @volatile private var numReceivers: Int = 0 def start(time: Time) { this.synchronized { @@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) - outputStreams.foreach(_.validateAtStart) + outputStreams.foreach(_.validateAtStart()) + numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]]) + inputStreamNameAndID = inputStreams.map(is => (is.name, is.id)) inputStreams.par.foreach(_.start()) } } @@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { .toArray } - def getInputStreamName(streamId: Int): Option[String] = synchronized { - inputStreams.find(_.id == streamId).map(_.name) - } + def getNumReceivers: Int = numReceivers + + def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 95f5821..ed4c1e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def numInactiveReceivers: Int = { - ssc.graph.getReceiverInputStreams().length - numActiveReceivers + ssc.graph.getNumReceivers - numActiveReceivers } def numTotalCompletedBatches: Long = synchronized { @@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def retainedCompletedBatches: Seq[BatchUIData] = synchronized { - completedBatchUIData.toSeq + completedBatchUIData.toIndexedSeq } def streamName(streamId: Int): Option[String] = { - ssc.graph.getInputStreamName(streamId) + ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1) } /** * Return all InputDStream Ids */ - def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id) + def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2) /** * Return all of the record rates for each InputDStream in each batch. The key of the return value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org