Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2492
  
    This breaks the semantics of transactional spouts. Please see 
http://storm.apache.org/releases/1.1.1/Trident-spouts.html and the sections 
about transactional and opaque spouts here 
http://storm.apache.org/releases/1.1.1/Trident-state.html. The latter link 
describes exactly you case in the example for why transactional spouts should 
not always be used.
    
    > You might be wondering – why wouldn't you just always use a 
transactional spout? They're simple and easy to understand. One reason you 
might not use one is because they're not necessarily very fault-tolerant. For 
example, the way TransactionalTridentKafkaSpout works is the batch for a txid 
will contain tuples from all the Kafka partitions for a topic. Once a batch has 
been emitted, any time that batch is re-emitted in the future the exact same 
set of tuples must be emitted to meet the semantics of transactional spouts. 
Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch 
fails to process, and at the same time one of the Kafka nodes goes down. You're 
now incapable of replaying the same batch as you did before (since the node is 
down and some partitions for the topic are not unavailable), and processing 
will halt. 
    
    Basically transactional non-opaque spouts guarantee that a given 
transaction id always corresponds to the same set of tuples. So for example if 
txid = 1 contains tuples [A, B, C], then if txid = 1 is reemitted, it has to 
still contain exactly [A, B, C]. This change will break that guarantee, because 
if e.g. A, B have been deleted from Kafka, we'll hit the new code and txid = 1 
might now contain [C, E, F] instead. 
    
    If you need to be able to handle offsets being deleted from Kafka (which is 
what you're trying to do with config.usestarttimeifoffsetoutofrange), you need 
to instead use an Opaque spout 
(https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java).


---

Reply via email to