Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2454
This PR doesn't actually fix the issue reported in STORM-2847. The problem
is that we can't commit offsets for partitions the consumer is not assigned.
We've previously assumed that the spout was assigned every partition it had
offset managers for, but this isn't true when reactivating a deactivated spout.
When we reactivate a spout, the consumer has no assigned partitions when
onPartitionsRevoked is called initially.
It doesn't seem ideal that we unconditionally commit offsets for all
partitions we have offset managers for, when the onPartitionsRevoked method
parameter tells us which partitions were assigned previously.
I'll close this and reopen once the issue is fixed. My immediate ideas
would be these:
* Put KafkaConsumer instantiation in open(), keep the same consumer for the
lifetime of the spout: Benefit is that it's easy to "get right". Drawback is
that if you deactivate a spout in order to e.g. update a committed offset with
the scripts Kafka ships with, when you activate the spout it will resume from
where it was, not from the new committed offset. Maybe it is better if
deactivate/activate behaves the same as if the spout had restarted (e.g. as it
would if the worker crashed). Also the consumer remains open while the spout
isn't running.
* Wipe internal spout state when deactivating, including OffsetManagers:
Benefit is that the spout behaves as if it had been restarted when reactivated,
drawback is that some acked tuples that couldn't be committed when the spout
was deactivated may be unnecessarily replayed in at-least-once mode, e.g. if
offset 0-2, 4-5 had been acked before deactivating, 4-5 couldn't be committed
and would be replayed.
* Do a quick hack and make ManualPartitionSubscription only call
onPartitionsRevoked if the assignment wasn't empty.
I'd appreciate your thoughts on which of these proposals you prefer (or if
you have other ideas). Sorry about the partial PR.
---