[ https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-4845: ------------------------------- Description: When integrating with spark streaming, kafka consumer cannot get the latest offsets except for one partition. The code snippet is as follows: {code} protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer c.poll(0) val parts = c.assignment().asScala val newPartitions = parts.diff(currentOffsets.keySet) currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap c.pause(newPartitions.asJava) c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap } {code} When calling consumer.position(topicPartition), it will call updateFetchPositions(Collections.singleton(partition)): The bug lies in updateFetchPositions(Set<TopicPartition> partitions): {code} fetcher.resetOffsetsIfNeeded(partitions); // reset to latest offset for current partition if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for all partitions before, so this sentence will be true coordinator.refreshCommittedOffsetsIfNeeded(); fetcher.updateFetchPositions(partitions); // reset to committed offsets for current partition } {code} So eventually there is only one partition(the last partition in assignment) can get latest offset while all the others get the committed offset. was: When integrating with spark streaming, kafka consumer cannot get the latest offsets except for one partition. The code snippet is as follows: protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer c.poll(0) val parts = c.assignment().asScala val newPartitions = parts.diff(currentOffsets.keySet) currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap c.pause(newPartitions.asJava) c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap } When calling consumer.position(topicPartition), it will call updateFetchPositions(Collections.singleton(partition)): The bug lies in updateFetchPositions(Set<TopicPartition> partitions): fetcher.resetOffsetsIfNeeded(partitions); // reset to latest offset for current partition if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for all partitions before, so this sentence will be true coordinator.refreshCommittedOffsetsIfNeeded(); fetcher.updateFetchPositions(partitions); // reset to committed offsets for current partition } So eventually there is only one partition(the last partition in assignment) can get latest offset while all the others get the committed offset. > KafkaConsumer.seekToEnd cannot take effect when integrating with spark > streaming > -------------------------------------------------------------------------------- > > Key: KAFKA-4845 > URL: https://issues.apache.org/jira/browse/KAFKA-4845 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0 > Reporter: Dan > > When integrating with spark streaming, kafka consumer cannot get the latest > offsets except for one partition. The code snippet is as follows: > {code} > protected def latestOffsets(): Map[TopicPartition, Long] = { > val c = consumer > c.poll(0) > val parts = c.assignment().asScala > val newPartitions = parts.diff(currentOffsets.keySet) > currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> > c.position(tp)).toMap > c.pause(newPartitions.asJava) > c.seekToEnd(currentOffsets.keySet.asJava) > parts.map(tp => tp -> c.position(tp)).toMap > } > {code} > When calling consumer.position(topicPartition), it will call > updateFetchPositions(Collections.singleton(partition)): > The bug lies in updateFetchPositions(Set<TopicPartition> partitions): > {code} > fetcher.resetOffsetsIfNeeded(partitions); // reset to latest > offset for current partition > if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for > all partitions before, so this sentence will be true > coordinator.refreshCommittedOffsetsIfNeeded(); > fetcher.updateFetchPositions(partitions); // reset to committed > offsets for current partition > } > {code} > So eventually there is only one partition(the last partition in assignment) > can get latest offset while all the others get the committed offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)