This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ecfdffc  [SPARK-27503][DSTREAM] JobGenerator thread exit for some 
fatal errors but application keeps running
ecfdffc is described below

commit ecfdffcb3560e21ccd318de6a0c614fa0c3aabf5
Author: uncleGen <husty...@gmail.com>
AuthorDate: Tue Apr 23 07:11:58 2019 -0700

    [SPARK-27503][DSTREAM] JobGenerator thread exit for some fatal errors but 
application keeps running
    
    ## What changes were proposed in this pull request?
    
    In some corner cases, `JobGenerator` thread (including some other EventLoop 
threads) may exit for some fatal error, like OOM, but Spark Streaming job keep 
running with no batch job generating. Currently, we only report any non-fatal 
error.
    
    ```
    override def run(): Unit = {
          try {
            while (!stopped.get) {
              val event = eventQueue.take()
              try {
                onReceive(event)
              } catch {
                case NonFatal(e) =>
                  try {
                    onError(e)
                  } catch {
                    case NonFatal(e) => logError("Unexpected error in " + name, 
e)
                  }
              }
            }
          } catch {
            case ie: InterruptedException => // exit even if eventQueue is not 
empty
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
    ```
    
    In this PR, we double check if event thread alive when post Event
    
    ## How was this patch tested?
    
    existing unit tests
    
    Closes #24400 from uncleGen/SPARK-27503.
    
    Authored-by: uncleGen <husty...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 core/src/main/scala/org/apache/spark/util/EventLoop.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala 
b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
index 651ea49..5125adc 100644
--- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala
+++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
@@ -100,7 +100,13 @@ private[spark] abstract class EventLoop[E](name: String) 
extends Logging {
    * Put the event into the event queue. The event thread will process it 
later.
    */
   def post(event: E): Unit = {
-    eventQueue.put(event)
+    if (!stopped.get) {
+      if (eventThread.isAlive) {
+        eventQueue.put(event)
+      } else {
+        onError(new IllegalStateException(s"$name has already been stopped 
accidentally."))
+      }
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to