I'm trying to implement an error-handling IPartitionedTridentSpout that limits 
the number of retries of a batch. The problem I've got is the interaction 
between the coordinator and emitter.

The spout reads from a kafka queue. If there's no messages been put on the 
queue recently, then the coordinator returns false from isReady. When there are 
messages, the coordinator returns true and lets the emitter read the spouts. 
This leads to some rather interesting code in which the coordinator has to try 
and guess which offset the emitter is currently reading from, but that's beside 
the point.

The problem comes from when a batch is retried. When a batch is retried, 
isReady is called with the txid to be replayed. In this situation, the 
coordinator must always return true, even if no more messages are on the queue, 
as the emitter is reading messages already on the queue from a previous batch. 
But the coordinator has no way of knowing if the txid passed to isReady() is a 
replay or a new transaction, as the txid is a raw long, not a 
TransactionAttempt.

So either the emitter does not replay a batch until some new messages are put 
on the queue, or the coordinator simply returns true in every call to isReady 
and the kafka server gets hammered (and lots of txs go through the system 
unnecessarily). Both solutions are bad.

What's the solution to this? Do the emitter and coordinator need their own 
method of communication other than the interfaces provided by trident? Are the 
emitter and coordinator guaranteed to always run on the same worker, so I can 
use an in-memory static queue or something for extra coordinator <-> emitter 
communication? Or am I misunderstanding the meaning of isReady()?

It would be really really helpful if the coordinator could be passed a 
TransactionAttempt to isReady rather than a raw long...

Thanks,
SimonC

Reply via email to