[ https://issues.apache.org/jira/browse/KAFKA-9136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhangzhisheng updated KAFKA-9136: --------------------------------- Description: for example,i have two topics,one_topic、two_topic,each topic have two partitions,consumer group 'c_group' subscribe this topics ; how to get c_group latest commited timestamp? {code:java} public static long lastCommitTimestamp(String groupId, String bootstrapServers) { int partition = Math.abs(groupId.hashCode() % 50); TopicPartition tp = new TopicPartition("__consumer_offsets", partition); Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { consumer.assign(Arrays.asList(tp)); consumer.seekToEnd(Collections.singletonList(tp)); ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(30)); if (records.count() == 0) { return -1; } for (ConsumerRecord<byte[], byte[]> record : records) { BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if(baseKey instanceof OffsetKey){ OffsetKey offsetKey = (OffsetKey) baseKey; if("one_topic".equals(offsetKey.key().topicPartition().topic())) { OffsetAndMetadata offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); long commitTimestamp = offsetAndMetadata.commitTimestamp(); //why commitTimestamp is current timestamp } } }{code} return records.iterator().next().timestamp(); } } was:for example,i have two topics,one_topic、two_topic,each topic have two partitions,consumer group 'c_group' subscribe this topics ; how to get c_group latest commited timestamp? > get consumer latest commited timestamp > --------------------------------------- > > Key: KAFKA-9136 > URL: https://issues.apache.org/jira/browse/KAFKA-9136 > Project: Kafka > Issue Type: Wish > Components: consumer > Affects Versions: 2.3.0 > Reporter: zhangzhisheng > Priority: Major > > for example,i have two topics,one_topic、two_topic,each topic have two > partitions,consumer group 'c_group' subscribe this topics ; how to get > c_group latest commited timestamp? > {code:java} > public static long lastCommitTimestamp(String groupId, String > bootstrapServers) { > int partition = Math.abs(groupId.hashCode() % 50); > TopicPartition tp = new TopicPartition("__consumer_offsets", partition); > Properties props = new Properties(); > props.put("bootstrap.servers", bootstrapServers); > props.put("group.id", groupId); > props.put("enable.auto.commit", "false"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { > consumer.assign(Arrays.asList(tp)); > consumer.seekToEnd(Collections.singletonList(tp)); > ConsumerRecords<byte[], byte[]> records = > consumer.poll(Duration.ofSeconds(30)); > if (records.count() == 0) { > return -1; > } > for (ConsumerRecord<byte[], byte[]> record : records) { > BaseKey baseKey = > GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); > if(baseKey instanceof OffsetKey){ > OffsetKey offsetKey = (OffsetKey) baseKey; > if("one_topic".equals(offsetKey.key().topicPartition().topic())) { > OffsetAndMetadata offsetAndMetadata = > GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); > long commitTimestamp = offsetAndMetadata.commitTimestamp(); > //why commitTimestamp is current timestamp > } > } > }{code} > return records.iterator().next().timestamp(); > } > } -- This message was sent by Atlassian Jira (v8.3.4#803005)