[ 
https://issues.apache.org/jira/browse/STORM-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Janith Kaiprath Valiyalappil updated STORM-2720:
------------------------------------------------
    Description: 
Offsets for a given partition at a particular timestamp can now be found using 
offsetsForTimes API. 
https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map).
Adding this to the 

  was:
The storm kakfa trident spout uses the KafkaTridentSpoutTopicPartitionRegistry, 
to get partition information. The coordinator calls the getTopicPartitions() 
method to get partition information and passes it to the emitters. But this 
partition information will not be accurate as all instances of 
KafkaTridentSpoutTopicPartitionRegistry will not be updated with full partition 
information.

The update to the registry is done when the consumer subscribes using 
KafkaSpoutConsumerRebalanceListener. This calls the 
KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions); These 
calls would only update the registry in that particular worker with partition 
information for consumers in that worker.

So when the coordinator calls the getOrderedPartitions() and passes it to each 
emitter by calling getOrderedPartitions(), the full partition information will 
not be present. The only probable case this would work is if the emitters and 
coordinators were on the same worker.


> Support timestamp based FirstPollOffsetStrategy to KafkaTridentSpoutOpaque
> --------------------------------------------------------------------------
>
>                 Key: STORM-2720
>                 URL: https://issues.apache.org/jira/browse/STORM-2720
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka-client, trident
>    Affects Versions: 1.1.1
>            Reporter: Janith Kaiprath Valiyalappil
>            Priority: Minor
>
> Offsets for a given partition at a particular timestamp can now be found 
> using offsetsForTimes API. 
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map).
> Adding this to the 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to