This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 52c92ad [SPARK-30346][CORE] Improve logging when events dropped 52c92ad is described below commit 52c92ad31ea7460c2b0957e7ec54e41c6c6cf7d1 Author: Liupengcheng <liupengch...@xiaomi.com> AuthorDate: Mon Feb 17 20:16:31 2020 +0800 [SPARK-30346][CORE] Improve logging when events dropped ### What changes were proposed in this pull request? Make logging events dropping every 60s works fine, the orignal implementaion some times not working due to susequent events comming and updating the DroppedEventCounter ### Why are the changes needed? Currenly, the logging may be skipped and delayed a long time under high concurrency, that make debugging hard. So This PR will try to fix it. ### Does this PR introduce any user-facing change? No ### How was this patch tested? NA Closes #27002 from liupc/Improve-logging-dropped-events-and-logging-threadDump. Authored-by: Liupengcheng <liupengch...@xiaomi.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/scheduler/AsyncEventQueue.scala | 36 ++++++++++++---------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 1bcddac..5164c30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -64,11 +64,14 @@ private class AsyncEventQueue( // processed (instead of just dequeued). private val eventCount = new AtomicLong() - /** A counter for dropped events. It will be reset every time we log it. */ + /** A counter for dropped events. */ private val droppedEventsCounter = new AtomicLong(0L) + /** A counter to keep number of dropped events last time it was logged */ + @volatile private var lastDroppedEventsCounter: Long = 0L + /** When `droppedEventsCounter` was logged last time in milliseconds. */ - @volatile private var lastReportTimestamp = 0L + private val lastReportTimestamp = new AtomicLong(0L) private val logDroppedEvent = new AtomicBoolean(false) @@ -167,21 +170,19 @@ private class AsyncEventQueue( } logTrace(s"Dropping event $event") - val droppedCount = droppedEventsCounter.get - if (droppedCount > 0) { - // Don't log too frequently - if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { - // There may be multiple threads trying to decrease droppedEventsCounter. - // Use "compareAndSet" to make sure only one thread can win. - // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and - // then that thread will update it. - if (droppedEventsCounter.compareAndSet(droppedCount, 0)) { - val prevLastReportTimestamp = lastReportTimestamp - lastReportTimestamp = System.currentTimeMillis() - val previous = new java.util.Date(prevLastReportTimestamp) - logWarning(s"Dropped $droppedCount events from $name since " + - s"${if (prevLastReportTimestamp == 0) "the application started" else s"$previous"}.") - } + val droppedEventsCount = droppedEventsCounter.get + val droppedCountIncreased = droppedEventsCount - lastDroppedEventsCounter + val lastReportTime = lastReportTimestamp.get + val curTime = System.currentTimeMillis() + // Don't log too frequently + if (droppedCountIncreased > 0 && curTime - lastReportTime >= LOGGING_INTERVAL) { + // There may be multiple threads trying to logging dropped events, + // Use 'compareAndSet' to make sure only one thread can win. + if (lastReportTimestamp.compareAndSet(lastReportTime, curTime)) { + val previous = new java.util.Date(lastReportTime) + lastDroppedEventsCounter = droppedEventsCount + logWarning(s"Dropped $droppedCountIncreased events from $name since " + + s"${if (lastReportTime == 0) "the application started" else s"$previous"}.") } } } @@ -213,4 +214,5 @@ private object AsyncEventQueue { val POISON_PILL = new SparkListenerEvent() { } + val LOGGING_INTERVAL = 60 * 1000 } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org