This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 52c92ad  [SPARK-30346][CORE] Improve logging when events dropped
52c92ad is described below

commit 52c92ad31ea7460c2b0957e7ec54e41c6c6cf7d1
Author: Liupengcheng <liupengch...@xiaomi.com>
AuthorDate: Mon Feb 17 20:16:31 2020 +0800

    [SPARK-30346][CORE] Improve logging when events dropped
    
    ### What changes were proposed in this pull request?
    
    Make logging events dropping every 60s works fine, the orignal 
implementaion some times not working due to susequent events comming and 
updating the DroppedEventCounter
    
    ### Why are the changes needed?
    
    Currenly, the logging may be skipped and delayed a long time under high 
concurrency, that make debugging hard. So This PR will try to fix it.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    NA
    
    Closes #27002 from 
liupc/Improve-logging-dropped-events-and-logging-threadDump.
    
    Authored-by: Liupengcheng <liupengch...@xiaomi.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/scheduler/AsyncEventQueue.scala   | 36 ++++++++++++----------
 1 file changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala 
b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
index 1bcddac..5164c30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
@@ -64,11 +64,14 @@ private class AsyncEventQueue(
   // processed (instead of just dequeued).
   private val eventCount = new AtomicLong()
 
-  /** A counter for dropped events. It will be reset every time we log it. */
+  /** A counter for dropped events. */
   private val droppedEventsCounter = new AtomicLong(0L)
 
+  /** A counter to keep number of dropped events last time it was logged */
+  @volatile private var lastDroppedEventsCounter: Long = 0L
+
   /** When `droppedEventsCounter` was logged last time in milliseconds. */
-  @volatile private var lastReportTimestamp = 0L
+  private val lastReportTimestamp = new AtomicLong(0L)
 
   private val logDroppedEvent = new AtomicBoolean(false)
 
@@ -167,21 +170,19 @@ private class AsyncEventQueue(
     }
     logTrace(s"Dropping event $event")
 
-    val droppedCount = droppedEventsCounter.get
-    if (droppedCount > 0) {
-      // Don't log too frequently
-      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
-        // There may be multiple threads trying to decrease 
droppedEventsCounter.
-        // Use "compareAndSet" to make sure only one thread can win.
-        // And if another thread is increasing droppedEventsCounter, 
"compareAndSet" will fail and
-        // then that thread will update it.
-        if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
-          val prevLastReportTimestamp = lastReportTimestamp
-          lastReportTimestamp = System.currentTimeMillis()
-          val previous = new java.util.Date(prevLastReportTimestamp)
-          logWarning(s"Dropped $droppedCount events from $name since " +
-            s"${if (prevLastReportTimestamp == 0) "the application started" 
else s"$previous"}.")
-        }
+    val droppedEventsCount = droppedEventsCounter.get
+    val droppedCountIncreased = droppedEventsCount - lastDroppedEventsCounter
+    val lastReportTime = lastReportTimestamp.get
+    val curTime = System.currentTimeMillis()
+    // Don't log too frequently
+    if (droppedCountIncreased > 0 && curTime - lastReportTime >= 
LOGGING_INTERVAL) {
+      // There may be multiple threads trying to logging dropped events,
+      // Use 'compareAndSet' to make sure only one thread can win.
+      if (lastReportTimestamp.compareAndSet(lastReportTime, curTime)) {
+        val previous = new java.util.Date(lastReportTime)
+        lastDroppedEventsCounter = droppedEventsCount
+        logWarning(s"Dropped $droppedCountIncreased events from $name since " +
+          s"${if (lastReportTime == 0) "the application started" else 
s"$previous"}.")
       }
     }
   }
@@ -213,4 +214,5 @@ private object AsyncEventQueue {
 
   val POISON_PILL = new SparkListenerEvent() { }
 
+  val LOGGING_INTERVAL = 60 * 1000
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to