Hi,

I'm struggling around an issue ever since I tried to upgrade my Spark
Streaming solution from 1.4.1 to 1.5+.

I have a Spark Streaming app which creates 3 ReceiverInputDStreams
leveraging KinesisUtils.createStream API.

I used to leverage a timeout to terminate my app
(StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf
spark.streaming.stopGracefullyOnShutdown=true).

I used to submit my Spark app on EMR in yarn-cluster mode.

Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9).

Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 on
emr-4.3.0) I can't get the app to actually terminate. Logs tells me it
tries to, but no confirmation of receivers stop is retrieved. Instead, when
the timer gets to the next period, the StreamingContext continues its
processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem
and pmem killls disabled).

...

16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED,
exitCode: 0
16/02/02 21:22:08 INFO StreamingContext: Invoking
stop(stopGracefully=true) from shutdown hook
16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers
16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to
terminate gracefully
16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free:
1224.0 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free:
1224.7 MB)
16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free:
1224.0 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-140.ec2.internal:50542 in memory (size: 3.0 KB, free:
1224.7 MB)
16/02/02 21:25:00 INFO StateDStream: Marking RDD 680 for time
1454448300000 ms for checkpointing
16/02/02 21:25:00 INFO StateDStream: Marking RDD 708 for time
1454448300000 ms for checkpointing
16/02/02 21:25:00 INFO TransformedDStream: Slicing from 1454448000000
ms to 1454448300000 ms (aligned to 1454448000000 ms and 1454448300000
ms)
16/02/02 21:25:00 INFO StateDStream: Marking RDD 777 for time
1454448300000 ms for checkpointing
16/02/02 21:25:00 INFO StateDStream: Marking RDD 801 for time
1454448300000 ms for checkpointing
16/02/02 21:25:00 INFO JobScheduler: Added jobs for time 1454448300000 ms
16/02/02 21:25:00 INFO JobGenerator: Checkpointing graph for time
1454448300000 ms
16/02/02 21:25:00 INFO DStreamGraph: Updating checkpoint data for time
1454448300000 ms
16/02/02 21:25:00 INFO JobScheduler: Starting job streaming job
1454448300000 ms.0 from job set of time 1454448300000 ms

...


Please, this is really blocking in the upgrade process to latest Spark
versions and I really don't know how to work it around.

Any help would be very much appreciated.

Thank you,

Roberto

Reply via email to