[ 
https://issues.apache.org/jira/browse/STORM-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687650#comment-16687650
 ] 

Stig Rohde Døssing edited comment on STORM-3279 at 11/15/18 8:43 AM:
---------------------------------------------------------------------

Took a look, it looks like we did pretty similar things. I think an issue with 
your solution could be if the emitter is configured to seek to LATEST, and it 
emits 0 tuples, the next attempt to emit will seek to LATEST again. If the 
producer is sufficiently slow, the spout might never emit anything.

Edit: No wait, I'm wrong. It looks fine.


was (Author: srdo):
Took a look, it looks like we did pretty similar things. I think an issue with 
your solution could be if the emitter is configured to seek to LATEST, and it 
emits 0 tuples, the next attempt to emit will seek to LATEST again. If the 
producer is sufficiently slow, the spout might never emit anything.

> Kafka trident spout could loose its position with EARLIEST or LATEST 
> FirstPollOffsetStrategy
> --------------------------------------------------------------------------------------------
>
>                 Key: STORM-3279
>                 URL: https://issues.apache.org/jira/browse/STORM-3279
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: trident
>    Affects Versions: 2.0.1
>            Reporter: Janith Kaiprath Valiyalappil
>            Assignee: Janith Kaiprath Valiyalappil
>            Priority: Major
>              Labels: kafka, trident
>
> In KafkaTridentSpoutEmitter emitPartitionBatch() function, when 
> kafkaConsumer.poll(pollTimeoutMs) returns 0 records for the very first 
> transaction where FirstPollOffsetStrategy is set to EARLIEST or LATEST, the 
> spout fails to move to EARLIEST or LATEST, and continues from the last 
> metadata position.
>  
> The flow of events which would cause this bug :
>  
> 1. FirstPollOffsetStrategy set to EARLIEST or LATEST
> 2. For first transaction after restart txid1 Based on [link 
> L164|https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L164]
>  ,
> The currentBatch is initialized to lastBatchMeta (which need not be null);
> 3. Later in L171, the consumer seeks to "start" OR "end"
> 4. Then consumer.poll(pollTimeoutMs) is called.
> 5. If poll returns non 0 records , currentBatch is set to a new metadata . 
> *If poll returns 0 records,*
> *currentBatch is not reset ie, currentBatch is still lastBatchMeta (which 
> need not be null)*
>  
> So now in transaction txid2 after txid1, isFirstPoll() returns false, and the 
> spout continues from lastBatchMeta.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to