Thanks for your quick answer.

If I set "auto.offset.reset" to "smallest" as for KafkaParams like this

val kafkaParams = Map[String, String](
 "metadata.broker.list" -> brokers,
 "group.id" -> groupId,
 "auto.offset.reset" -> "smallest"
)

And then use :

val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics,
StorageLevel.MEMORY_AND_DISK_SER_2)

My fear is that, every time I deploy a new version, the all consumer's
topics are going to be read from the beginning, but as said in Kafka's
documentation

auto.offset.reset default : largest

What to do when there* is no initial offset in ZooKeeper* or if an
offset is out of range:
* smallest : automatically reset the offset to the smallest offset

So I will go for this option the next time I need to process a new topic 👍

To fix my problem, as the topic as already been processed and
registred in ZK, I will use a directStream from smallest and remove
all DB inserts of this topic, and restart a "normal" stream when the
lag will be caught up.


2016-02-22 10:57 GMT+01:00 Saisai Shao <sai.sai.s...@gmail.com>:

> You could set this configuration "auto.offset.reset" through parameter
> "kafkaParams" which is provided in some other overloaded APIs of
> createStream.
>
> By default Kafka will pick data from latest offset unless you explicitly
> set it, this is the behavior Kafka, not Spark.
>
> Thanks
> Saisai
>
> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq <paul.lecle...@tabmo.io>
> wrote:
>
>> Hi,
>>
>> Do you know why, with the receiver approach
>> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach>
>> and a *consumer group*, a new topic is not read from the beginning but
>> from the lastest ?
>>
>> Code example :
>>
>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>      [ZK quorum], [consumer group id], [per-topic number of Kafka partitions 
>> to consume])
>>
>>
>> Is there a way to tell *only for new topic *to read from the beginning ?
>>
>> From Confluence FAQ
>>
>>> Alternatively, you can configure the consumer by setting
>>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>>> for the old consumer.
>>
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>
>> Thanks
>> --
>>
>> Paul Leclercq
>>
>
>


-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/

Reply via email to