I really like the Streaming receiverless API for Kafka streaming jobs, but I'm finding the manual offset management adds a fair bit of complexity. I'm sure that others feel the same way, so I'm proposing that we add the ability to have consumer offsets managed via an easy-to-use API. This would be done similarly to how it is done in the receiver API.
I haven't written any code yet, but I've looked at the current version of the codebase and have an idea of how it could be done. To keep the size of the pull requests small, I propose that the following distinct features are added in order: 1. If a group ID is set in the Kafka params, and also if fromOffsets is not passed in to createDirectStream, then attempt to resume from the remembered offsets for that group ID. 2. Add a method on KafkaRDDs that commits the offsets for that KafkaRDD to Zookeeper. 3. Update the Python API with any necessary changes. My goal is to not break the existing API while adding the new functionality. One point that I'm not sure of is regarding the first point. I'm not sure whether it's a better idea to set the group ID as mentioned through Kafka params, or to define a new overload of createDirectStream that expects the group ID in place of the fromOffsets param. I think the latter is a cleaner interface, but I'm not sure whether adding a new param is a good idea. If anyone has any feedback on this general approach, I'd be very grateful. I'm going to open a JIRA in the next couple days and begin working on the first point, but I think comments from the community would be very helpful on building a good API here.