Kafka streaming receiver approach - new topic not read from beginning

2016-02-22 Thread Paul Leclercq
Hi,

Do you know why, with the receiver approach

and a *consumer group*, a new topic is not read from the beginning but from
the lastest ?

Code example :

 val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])


Is there a way to tell *only for new topic *to read from the beginning ?

>From Confluence FAQ

> Alternatively, you can configure the consumer by setting auto.offset.reset
> to "earliest" for the new consumer in 0.9 and "smallest" for the old
> consumer.


https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?

Thanks
-- 

Paul Leclercq


Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-22 Thread Saisai Shao
You could set this configuration "auto.offset.reset" through parameter
"kafkaParams" which is provided in some other overloaded APIs of
createStream.

By default Kafka will pick data from latest offset unless you explicitly
set it, this is the behavior Kafka, not Spark.

Thanks
Saisai

On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq 
wrote:

> Hi,
>
> Do you know why, with the receiver approach
> 
> and a *consumer group*, a new topic is not read from the beginning but
> from the lastest ?
>
> Code example :
>
>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions 
> to consume])
>
>
> Is there a way to tell *only for new topic *to read from the beginning ?
>
> From Confluence FAQ
>
>> Alternatively, you can configure the consumer by setting
>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>> for the old consumer.
>
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>
> Thanks
> --
>
> Paul Leclercq
>


Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-22 Thread Paul Leclercq
Thanks for your quick answer.

If I set "auto.offset.reset" to "smallest" as for KafkaParams like this

val kafkaParams = Map[String, String](
 "metadata.broker.list" -> brokers,
 "group.id" -> groupId,
 "auto.offset.reset" -> "smallest"
)

And then use :

val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics,
StorageLevel.MEMORY_AND_DISK_SER_2)

My fear is that, every time I deploy a new version, the all consumer's
topics are going to be read from the beginning, but as said in Kafka's
documentation

auto.offset.reset default : largest

What to do when there* is no initial offset in ZooKeeper* or if an
offset is out of range:
* smallest : automatically reset the offset to the smallest offset

So I will go for this option the next time I need to process a new topic 👍

To fix my problem, as the topic as already been processed and
registred in ZK, I will use a directStream from smallest and remove
all DB inserts of this topic, and restart a "normal" stream when the
lag will be caught up.


2016-02-22 10:57 GMT+01:00 Saisai Shao :

> You could set this configuration "auto.offset.reset" through parameter
> "kafkaParams" which is provided in some other overloaded APIs of
> createStream.
>
> By default Kafka will pick data from latest offset unless you explicitly
> set it, this is the behavior Kafka, not Spark.
>
> Thanks
> Saisai
>
> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq 
> wrote:
>
>> Hi,
>>
>> Do you know why, with the receiver approach
>> 
>> and a *consumer group*, a new topic is not read from the beginning but
>> from the lastest ?
>>
>> Code example :
>>
>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions 
>> to consume])
>>
>>
>> Is there a way to tell *only for new topic *to read from the beginning ?
>>
>> From Confluence FAQ
>>
>>> Alternatively, you can configure the consumer by setting
>>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>>> for the old consumer.
>>
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>
>> Thanks
>> --
>>
>> Paul Leclercq
>>
>
>


-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/


Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-23 Thread Paul Leclercq
I successfully processed my data by resetting manually my topic offsets on
ZK.

If it may help someone, here's my steps :

Make sure you stop all your consumers before doing that, otherwise they
overwrite the new offsets you wrote

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId}
{newOffset}


Source : https://metabroadcast.com/blog/resetting-kafka-offsets

2016-02-22 11:55 GMT+01:00 Paul Leclercq :

> Thanks for your quick answer.
>
> If I set "auto.offset.reset" to "smallest" as for KafkaParams like this
>
> val kafkaParams = Map[String, String](
>  "metadata.broker.list" -> brokers,
>  "group.id" -> groupId,
>  "auto.offset.reset" -> "smallest"
> )
>
> And then use :
>
> val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics, 
> StorageLevel.MEMORY_AND_DISK_SER_2)
>
> My fear is that, every time I deploy a new version, the all consumer's topics 
> are going to be read from the beginning, but as said in Kafka's documentation
>
> auto.offset.reset default : largest
>
> What to do when there* is no initial offset in ZooKeeper* or if an offset is 
> out of range:
> * smallest : automatically reset the offset to the smallest offset
>
> So I will go for this option the next time I need to process a new topic 👍
>
> To fix my problem, as the topic as already been processed and registred in 
> ZK, I will use a directStream from smallest and remove all DB inserts of this 
> topic, and restart a "normal" stream when the lag will be caught up.
>
>
> 2016-02-22 10:57 GMT+01:00 Saisai Shao :
>
>> You could set this configuration "auto.offset.reset" through parameter
>> "kafkaParams" which is provided in some other overloaded APIs of
>> createStream.
>>
>> By default Kafka will pick data from latest offset unless you explicitly
>> set it, this is the behavior Kafka, not Spark.
>>
>> Thanks
>> Saisai
>>
>> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq 
>> wrote:
>>
>>> Hi,
>>>
>>> Do you know why, with the receiver approach
>>> 
>>> and a *consumer group*, a new topic is not read from the beginning but
>>> from the lastest ?
>>>
>>> Code example :
>>>
>>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>>  [ZK quorum], [consumer group id], [per-topic number of Kafka 
>>> partitions to consume])
>>>
>>>
>>> Is there a way to tell *only for new topic *to read from the beginning ?
>>>
>>> From Confluence FAQ
>>>
 Alternatively, you can configure the consumer by setting
 auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
 for the old consumer.
>>>
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>>
>>> Thanks
>>> --
>>>
>>> Paul Leclercq
>>>
>>
>>
>
>
> --
>
> Paul Leclercq | Data engineer
>
>
>  paul.lecle...@tabmo.io  |  http://www.tabmo.fr/
>



-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/