[ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605116#comment-16605116
 ] 

Matthias J. Sax commented on KAFKA-7318:
----------------------------------------

{quote}Thanks for the answer, but I don't understand why a new policy would not 
resolve the issue.  'triggers if there is no committed offsets' is exactly what 
we want, because after subscribe this is no committed offset, that is the root 
cause why the consumer can not pool the message  which produced during it is 
offline after it become alive.
{quote}
Not sure if I can follow here.

If you want "trigger if there is no committed offset", what offset so you want 
to use if not "earliest" or "latest"? I am not sure what you mean by "consume 
only the messages after subscribing" – this sound exactly like "latest" policy 
to me. I guess I miss understand your request.

For the code snipped you provide: in `consumer.endOffsets()` you need to pass 
in the list of partitions you want to get the end-offsets for – if you pass in 
an empty list, you get an empty map back. Similar for `seekToEnd()` if you pass 
an empty list, the seek is a no-op.

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7318
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7318
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 1.1.0, 1.1.1, 2.0.0
>            Reporter: joechen8...@gmail.com
>            Priority: Major
>         Attachments: KafkaTest.java, KafkaTest.java, KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>    consumer.poll(0);
>    consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to