Hi there,

I'm facing a weird issue when upgrading from Spark 1.4.1 streaming driver
on EMR 3.9 (hence Hadoop 2.4.0) to Spark 1.5.2 on EMR 4.2 (hence Hadoop
2.6.0).

Basically, the very same driver which used to terminate after a timeout as
expected, now does not. In particular, as long as the driver's logs could
tell me, the StreamingContext seems to be stopped with success (and exit
code 0), but the Hadoop/YARN job does not terminate/complete. Instead,
after a couple of minutes hanging, the driver just seems to start its
processing again! Here follows a logs stack example collected during stop.

16/01/12 19:17:32 INFO ApplicationMaster: Final app status: SUCCEEDED,
> exitCode: 0
> 16/01/12 19:17:32 INFO StreamingContext: Invoking
> stop(stopGracefully=true) from shutdown hook
> 16/01/12 19:17:32 INFO ReceiverTracker: Sent stop signal to all 3 receivers
> 16/01/12 19:17:32 ERROR ReceiverTracker: Deregistered receiver for stream
> 1: Stopped by driver
> 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream
> 2: Stopped by driver
> 16/01/12 19:17:33 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID
> 97) in 1200804 ms on ip-172-31-9-4.ec2.internal (1/1)
> 16/01/12 19:17:33 INFO DAGScheduler: ResultStage 8 (start at
> NetflowStreamingApp.scala:68) finished in 1200.806 s
> 16/01/12 19:17:33 INFO YarnClusterScheduler: Removed TaskSet 8.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Stopped by driver
> 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 12.0
> (TID 101) in 1199753 ms on ip-172-31-9-4.ec2.internal (1/1)
> 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 12.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 12 (start at
> NetflowStreamingApp.scala:68) finished in 1199.753 s
> 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID
> 96) in 1201854 ms on ip-172-31-9-5.ec2.internal (1/1)
> 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 7 (start at
> NetflowStreamingApp.scala:68) finished in 1201.855 s
> 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 7.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:34 INFO ReceiverTracker: Waiting for receiver job to
> terminate gracefully
> 16/01/12 19:17:34 INFO ReceiverTracker: Waited for receiver job to
> terminate gracefully
> 16/01/12 19:17:34 INFO ReceiverTracker: All of the receivers have
> deregistered successfully
> 16/01/12 19:17:34 INFO WriteAheadLogManager : Stopped write ahead log
> manager
> 16/01/12 19:17:34 INFO ReceiverTracker: ReceiverTracker stopped
> 16/01/12 19:17:34 INFO JobGenerator: Stopping JobGenerator gracefully
> 16/01/12 19:17:34 INFO JobGenerator: Waiting for all received blocks to be
> consumed for job generation


The "receivers" mentioned in the logs are the Kinesis streams receivers.

In my Scala 2.10 based driver, I just use
StreamingContext.awaitTerminationOrTimeout(timeout) API  (called right
after StreamingContext.start()) and set the
SparkConf spark.streaming.stopGracefullyOnShutdown=true.

Did anybody experience anything similar?

Any help would be appreciated.

Thanks,

Roberto

Reply via email to