[
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vahid Hashemian reassigned KAFKA-4845:
--------------------------------------
Assignee: Vahid Hashemian
> KafkaConsumer.seekToEnd cannot take effect when integrating with spark
> streaming
> --------------------------------------------------------------------------------
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
> Reporter: Dan
> Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest
> offsets except for one partition. The code snippet is as follows:
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp ->
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
> }
> {code}
> When calling consumer.position(topicPartition), it will call
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set<TopicPartition> partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions); // reset to latest
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for
> all partitions before, so this sentence will be true
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions); // reset to committed
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment)
> can get latest offset while all the others get the committed offset.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)