Hi, I have been using DirectKafkaInputDStream in Spark Streaming to consumer kafka messages and it's been working very well. Now I have the need to batch process messages from Kafka, for example, retrieve all messages every hour and process them, output to destinations like Hive or HDFS. I would like to use KafkaRDD and normal Spark job to achieve this, so that many of the logics in my streaming code can be reused.
In the excellent blog post Exactly-Once Spark Streaming from Apache Kafka, there are code examples about using KafkaRDD. However, it requires an array of OffsetRange, which needs specify the start and end offset. My question is, should I write additional code to talk to Kafka and retrieve the latest offset for each partition every time this hourly job is run? Or is there any way to let KafkaUtils know to "read till latest" when creating the KafkaRDD? Thanks, Charles