[ 
https://issues.apache.org/jira/browse/KAFKA-9136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-9136:
---------------------------------
    Description: 
for example,i have two topics,one_topic、two_topic,each topic have two 
partitions,consumer group 'c_group' subscribe  this topics ; how to get c_group 
 latest commited timestamp?
{code:java}
public static long lastCommitTimestamp(String groupId, String bootstrapServers) 
{
 int partition = Math.abs(groupId.hashCode() % 50);
 TopicPartition tp = new TopicPartition("__consumer_offsets", partition);
 Properties props = new Properties();
 props.put("bootstrap.servers", bootstrapServers);
 props.put("group.id", groupId);
 props.put("enable.auto.commit", "false");
 props.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 props.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
 consumer.assign(Arrays.asList(tp));
 consumer.seekToEnd(Collections.singletonList(tp));
 ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(30));
 if (records.count() == 0) {
 return -1;
 }

 for (ConsumerRecord<byte[], byte[]> record : records) {
 BaseKey baseKey = 
GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
 if(baseKey instanceof OffsetKey){
 OffsetKey offsetKey = (OffsetKey) baseKey;
 if("one_topic".equals(offsetKey.key().topicPartition().topic())) {
 OffsetAndMetadata offsetAndMetadata = 
GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
 long commitTimestamp = offsetAndMetadata.commitTimestamp();
 //why commitTimestamp is current timestamp
 }
 }
 }{code}
return records.iterator().next().timestamp();
 }
}

  was:for example,i have two topics,one_topic、two_topic,each topic have two 
partitions,consumer group 'c_group' subscribe  this topics ; how to get c_group 
 latest commited timestamp?


>  get consumer latest commited timestamp
> ---------------------------------------
>
>                 Key: KAFKA-9136
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9136
>             Project: Kafka
>          Issue Type: Wish
>          Components: consumer
>    Affects Versions: 2.3.0
>            Reporter: zhangzhisheng
>            Priority: Major
>
> for example,i have two topics,one_topic、two_topic,each topic have two 
> partitions,consumer group 'c_group' subscribe  this topics ; how to get 
> c_group  latest commited timestamp?
> {code:java}
> public static long lastCommitTimestamp(String groupId, String 
> bootstrapServers) {
>  int partition = Math.abs(groupId.hashCode() % 50);
>  TopicPartition tp = new TopicPartition("__consumer_offsets", partition);
>  Properties props = new Properties();
>  props.put("bootstrap.servers", bootstrapServers);
>  props.put("group.id", groupId);
>  props.put("enable.auto.commit", "false");
>  props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>  props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>  try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
>  consumer.assign(Arrays.asList(tp));
>  consumer.seekToEnd(Collections.singletonList(tp));
>  ConsumerRecords<byte[], byte[]> records = 
> consumer.poll(Duration.ofSeconds(30));
>  if (records.count() == 0) {
>  return -1;
>  }
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  BaseKey baseKey = 
> GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
>  if(baseKey instanceof OffsetKey){
>  OffsetKey offsetKey = (OffsetKey) baseKey;
>  if("one_topic".equals(offsetKey.key().topicPartition().topic())) {
>  OffsetAndMetadata offsetAndMetadata = 
> GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
>  long commitTimestamp = offsetAndMetadata.commitTimestamp();
>  //why commitTimestamp is current timestamp
>  }
>  }
>  }{code}
> return records.iterator().next().timestamp();
>  }
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to