I'm trying to implement an error-handling IPartitionedTridentSpout that limits the number of retries of a batch. The problem I've got is the interaction between the coordinator and emitter.
The spout reads from a kafka queue. If there's no messages been put on the queue recently, then the coordinator returns false from isReady. When there are messages, the coordinator returns true and lets the emitter read the spouts. This leads to some rather interesting code in which the coordinator has to try and guess which offset the emitter is currently reading from, but that's beside the point. The problem comes from when a batch is retried. When a batch is retried, isReady is called with the txid to be replayed. In this situation, the coordinator must always return true, even if no more messages are on the queue, as the emitter is reading messages already on the queue from a previous batch. But the coordinator has no way of knowing if the txid passed to isReady() is a replay or a new transaction, as the txid is a raw long, not a TransactionAttempt. So either the emitter does not replay a batch until some new messages are put on the queue, or the coordinator simply returns true in every call to isReady and the kafka server gets hammered (and lots of txs go through the system unnecessarily). Both solutions are bad. What's the solution to this? Do the emitter and coordinator need their own method of communication other than the interfaces provided by trident? Are the emitter and coordinator guaranteed to always run on the same worker, so I can use an in-memory static queue or something for extra coordinator <-> emitter communication? Or am I misunderstanding the meaning of isReady()? It would be really really helpful if the coordinator could be passed a TransactionAttempt to isReady rather than a raw long... Thanks, SimonC