[ https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li resolved SPARK-25429. ----------------------------- Resolution: Fixed Assignee: Yuming Wang Fix Version/s: 2.5.0 > SparkListenerBus inefficient due to > 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure > ------------------------------------------------------------------------------------------------ > > Key: SPARK-25429 > URL: https://issues.apache.org/jira/browse/SPARK-25429 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 2.3.1 > Reporter: DENG FEI > Assignee: Yuming Wang > Priority: Major > Fix For: 2.5.0 > > > {code:java} > private def updateStageMetrics( > stageId: Int, > attemptId: Int, > taskId: Long, > accumUpdates: Seq[AccumulableInfo], > succeeded: Boolean): Unit = { > Option(stageMetrics.get(stageId)).foreach { metrics => > if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { > return > } > val oldTaskMetrics = metrics.taskMetrics.get(taskId) > if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { > return > } > val updates = accumUpdates > .filter { acc => acc.update.isDefined && > metrics.accumulatorIds.contains(acc.id) } > .sortBy(_.id) > if (updates.isEmpty) { > return > } > val ids = new Array[Long](updates.size) > val values = new Array[Long](updates.size) > updates.zipWithIndex.foreach { case (acc, idx) => > ids(idx) = acc.id > // In a live application, accumulators have Long values, but when > reading from event > // logs, they have String values. For now, assume all accumulators > are Long and covert > // accordingly. > values(idx) = acc.update.get match { > case s: String => s.toLong > case l: Long => l > case o => throw new IllegalArgumentException(s"Unexpected: $o") > } > } > // TODO: storing metrics by task ID can cause metrics for the same task > index to be > // counted multiple times, for example due to speculation or > re-attempts. > metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, > succeeded)) > } > } > {code} > 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated > many accumulator, it's inefficient use Arrray#contains. > Actually, application may timeout while quit and will killed by RM on YARN > mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org