Here's my use case: Messages are coming into a Kafka Topic for different 'Events'. Each event has a unique Event Id. I need to aggregate counts for each Event AFTER the event is completed. For now, we are thinking we can assume an event is completed if there are no more messages coming in for a period of X minutes. How do I do this using Spark Streaming? I am reading up on 'Stateful Transformations' with 'mapWithState'. Is that the right approach? Any sample code would be even more appreciated. Thanks.