[ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594524#comment-16594524
 ] 

joechen8...@gmail.com edited comment on KAFKA-7318 at 8/28/18 5:06 AM:
-----------------------------------------------------------------------

I try your solution as below but not work.  After subscribing, the partition 
assignment have not be triggered 
{code:java}
consumer.subscribe(kafkaTopics, consumerRebalanceListener);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(new ArrayList<>());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(TopicPartition p : endOffsets.keySet()){
offsets.put(p, new OffsetAndMetadata(endOffsets.get(p)));
}
consumer.commitSync(offsets);
consumer.seekToEnd(new ArrayList<>());
{code}


was (Author: joechen):
I try your solution as below but not work. 
{code:java}
consumer.subscribe(kafkaTopics, consumerRebalanceListener);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(new ArrayList<>());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(TopicPartition p : endOffsets.keySet()){
offsets.put(p, new OffsetAndMetadata(endOffsets.get(p)));
}
consumer.commitSync(offsets);
consumer.seekToEnd(new ArrayList<>());
{code}

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7318
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7318
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 1.1.0, 1.1.1, 2.0.0
>            Reporter: joechen8...@gmail.com
>            Priority: Major
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>    consumer.poll(0);
>    consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to