Jack Hu created SPARK-3275:
------------------------------

             Summary: Socket receiver can not recover when the socket server 
restarted 
                 Key: SPARK-3275
                 URL: https://issues.apache.org/jira/browse/SPARK-3275
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.0.2
            Reporter: Jack Hu


To reproduce this issue:
1. create a application with a socket dstream
2. start the socket server and start the application
3. restart the socket server
4. the socket dstream will fail to reconnect (it will close the connection 
after a successful connect)

The main issue should be the status in SocketReceiver and ReceiverSupervisor is 
incorrect after the reconnect:
In SocketReceiver ::receive() the while loop will never be entered after 
reconnect since the isStopped will returns true:
     val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      logInfo("Stopped receiving")
      restart("Retrying connecting to " + host + ":" + port)

That is caused by the status flag "receiverState" in ReceiverSupervisor will be 
set to Stopped when the connection losses, but it is reset after the call of 
Receiver start method:

def startReceiver(): Unit = synchronized {
    try {
      logInfo("Starting receiver")
      receiver.onStart()
      logInfo("Called receiver onStart")
      onReceiverStart()
      receiverState = Started
    } catch {
      case t: Throwable =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to