[ https://issues.apache.org/jira/browse/KAFKA-6470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chao.wu updated KAFKA-6470: --------------------------- Description: A topic-partition "adn-tracking,15" in kafka who's earliest offset is 1255644602 and latest offset is 1271253441. while starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record XXXX even after seeking to offset 1266921577". I implemented a simple project to use consumer to seek offset 1266921577. But it return the offset 1266921578. Then while seek to 1266921576, it return the 1266921576 exactly。 Why ? How to fix that ? There is the code: public class consumerDemo { public static void main(String[] argv) { Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); Collection<TopicPartition> collection = new ArrayList<TopicPartition>(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecords<String, String> consumerRecords = consumer.poll(10000); List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp); Iterator<ConsumerRecord<String, String> > iter = listR.iterator(); ConsumerRecord<String, String> record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } was: A topic-partition "adn-tracking,15" in kafka who's earliest offset is 1255644602 and latest offset is 1271253441. while starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record XXXX even after seeking to offset 1266921577". I implemented a simple project to use consumer to seek offset 1266921577. But it return the offset 1266921578. Then while seek to 1266921576, it return the 1266921576 exactly。 Why ? How to fix that ? There is the code: public class consumerDemo { public static void main(String[] argv){ Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); Collection<TopicPartition> collection = new ArrayList<TopicPartition>(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecords<String, String> consumerRecords = consumer.poll(10000); List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp); Iterator<ConsumerRecord<String, String> > iter = listR.iterator(); ConsumerRecord<String, String> record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } > no continuous offset for function seek > -------------------------------------- > > Key: KAFKA-6470 > URL: https://issues.apache.org/jira/browse/KAFKA-6470 > Project: Kafka > Issue Type: Bug > Components: clients, core > Affects Versions: 0.10.0.1 > Reporter: chao.wu > Priority: Major > > A topic-partition "adn-tracking,15" in kafka who's earliest offset is > 1255644602 and latest offset is 1271253441. > while starting a spark streaming to process the data from the topic , we got > a exception with "Got wrong record XXXX even after seeking to offset > 1266921577". > I implemented a simple project to use consumer to seek offset 1266921577. > But it return the offset 1266921578. Then while seek to 1266921576, it > return the 1266921576 exactly。 > Why ? How to fix that ? > > > There is the code: > public class consumerDemo { > public static void main(String[] argv) > { > > Properties props = new Properties(); > props.put("bootstrap.servers", "172.31.29.31:9091"); > props.put("group.id", "consumer-tutorial-demo"); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class.getName()); > KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(props); > TopicPartition tp = new TopicPartition("adn-tracking-click", 15); > Collection<TopicPartition> collection = new ArrayList<TopicPartition>(); > collection.add(tp); > consumer.assign(collection); > consumer.seek(tp, 1266921576); > ConsumerRecords<String, String> consumerRecords = consumer.poll(10000); > List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp); > Iterator<ConsumerRecord<String, String> > iter = listR.iterator(); > ConsumerRecord<String, String> record = iter.next(); > System.out.println(" the next record " + record.offset() + " recode topic " + > record.topic()); } > } > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)