Jack Hu created SPARK-3275: ------------------------------ Summary: Socket receiver can not recover when the socket server restarted Key: SPARK-3275 URL: https://issues.apache.org/jira/browse/SPARK-3275 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.0.2 Reporter: Jack Hu
To reproduce this issue: 1. create a application with a socket dstream 2. start the socket server and start the application 3. restart the socket server 4. the socket dstream will fail to reconnect (it will close the connection after a successful connect) The main issue should be the status in SocketReceiver and ReceiverSupervisor is incorrect after the reconnect: In SocketReceiver ::receive() the while loop will never be entered after reconnect since the isStopped will returns true: val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) That is caused by the status flag "receiverState" in ReceiverSupervisor will be set to Stopped when the connection losses, but it is reset after the call of Receiver start method: def startReceiver(): Unit = synchronized { try { logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") onReceiverStart() receiverState = Started } catch { case t: Throwable => stop("Error starting receiver " + streamId, Some(t)) } } -- This message was sent by Atlassian JIRA (v6.2#6252) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org