Hi Sachin,

What do you mean by "with this commented"? Did you set auto.offset.reset
to "earliest" or not? Default value is "latest" and if you do not set it
to "earliest", that the application will start consuming from
end-of-topic if no committed offsets are found.

For default values of Kafka Streams parameters see
http://docs.confluent.io/3.0.1/streams/developer-guide.html#configuring-a-kafka-streams-application

For default consumer parameters see
http://kafka.apache.org/0100/documentation.html#newconsumerconfigs


-Matthias


On 12/27/16 3:27 PM, Sachin Mittal wrote:
> Hi,
> I started my new streams app with this commented
> //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
> What I observed was that it started consuming message from latest offset
> from the source topic.
> Based on comments by Eno I thought that if offset do not exist then
> 
> So in practice an app will include the property below (e.g., set to
> "earliest")
> 
> However it picked the latest offset. Also I check this props
> auto.offset.reset in the latest doc and I see it as largest.
> 
> So now I am confused that in streams application what is the default offset
> if none exist?
> 
> note I am using version kafka_2.10-0.10.0.1.
> 
> Thanks
> Sachin
> 
> 
> 
> On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> Hi Sachin,
>>
>> There is no need to check within the app whether the offset exists or not,
>> since the consumer code will do that check automatically for you. So in
>> practice an app will include the property below (e.g., set to "earliest"),
>> but that will only have an effect if the consumer detects that the offsets
>> do not exist anymore. If the offset exist, then that line is a noop.
>>
>> So in summary, I'd just include that property, and no more code changes
>> are required.
>>
>> Thanks
>> Eno
>>
>>> On 21 Nov 2016, at 12:11, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>
>>> So in my java code how can I check
>>> when there is no initial offset in Kafka or if the current offset does
>> not
>>> exist any more on the server (e.g. because that data has been deleted)
>>>
>>> So in this case as you have said I can set offset as
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or
>> latest
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.there...@gmail.com
>> <mailto:eno.there...@gmail.com>>
>>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> Currently you can only change the following global configuration by
>> using
>>>> "earliest" or "latest", as shown in the code snippet below. As the
>> Javadoc
>>>> mentions: "What to do when there is no initial offset in Kafka or if the
>>>> current offset does not exist any more on the server (e.g. because that
>>>> data has been deleted)":
>>>>
>>>> ...
>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>> ...
>>>> return new KafkaStreams(builder, props)
>>>>
>>>>
>>>>
>>>> In addition, there is a tool to reset the offsets of all topics to the
>>>> beginning. This is useful for reprocessing:
>> https://www.confluent.io/blog/
>>>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
>>>> https://www.confluent.io/blog/data-reprocessing-with- <
>> https://www.confluent.io/blog/data-reprocessing-with->
>>>> kafka-streams-resetting-a-streams-application/>
>>>>
>>>> However, there is no option currently for resetting the offset to an
>>>> arbitrary offset.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>>>
>>>>> Hi
>>>>> I am running a streaming application with
>>>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>>>>>
>>>>> How do I find out the offsets for each of the source, intermediate and
>>>>> internal topics associated with this application.
>>>>>
>>>>> And how can I reset them to some specific value via shell of otherwise.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to