Jason Gustafson created KAFKA-12207:
---------------------------------------
Summary: Do not maintain list of latest producer append
information
Key: KAFKA-12207
URL: https://issues.apache.org/jira/browse/KAFKA-12207
Project: Kafka
Issue Type: Bug
Reporter: Jason Gustafson
For each producerId writing to each partition, we maintain a list of the 5 most
recent appended sequence numbers and the corresponding offsets in the log. If a
producer fails to receive a successful response and retries the Produce
request, then we can still return the offset of the successful append, which is
returned to the user inside `RecordMetadata`. (Note that the limit of 5 most
recent appends is where we derive the limit on the max number of inflight
requests that the producer is allowed when idempotence is enabled.)
This is only a "best-effort" attempt to return the offset of the append. For
example, we do not populate the full list of recent appends when the log is
reloaded. Only the latest sequence/offset are reloaded from the snapshot. If we
receive a duplicate and we do not have the offset, then the broker currently
handles this by returning OUT_OF_ORDER_SEQUENCE.
In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was intended
to handle this case and the producer already checks for it. If the producer
sees this error in the response, then the `send` is considered successful, but
the producer returns -1 as both the offset and timestamp inside
`RecordMetadata`.
The reason we never implemented this on the broker is probably because we allow
the sequence numbers of the producer to wrap around after reaching
Int.MaxValue. What we considered in the past is fixing a number like 1000 and
requiring that the sequence be within that range to be considered a duplicate.
A better solution going forward is to let the producer bump the epoch when the
sequence hits Int.MaxValue, but we still have to allow sequence numbers to wrap
for compatibility.
Given the loose guarantees that we already have here, I'm considering whether
the additional bookkeeping and the required memory are worth preserving. As an
alternative, we could consider the following:
1. The broker will only maintain the latest sequence/offset for each producerId
2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within
1000 of the latest sequence (accounting for overflow).
3. Instead of wrapping around sequence numbers, the producer will bump the
epoch if possible. It's worth noting that the idempotent producer can freely
bump the epoch, so the only time we should ever need to wrap the sequence is
for the transactional producer when it is used on a broker which does not
support the `InitProducerId` version which allows epoch bumps.
4. We can remove the restriction on `max.in.flight.requests.per.connection` and
document that if the offset is required in `RecordMetadata`, then the user must
set this to 1. Internally, if connecting to an old broker which does not
support epoch bumps, then we can restrict the number of inflight requests to 5.
The benefit in the end is that we can reduce the memory usage for producer
state and the complexity to manage that state. It also gives us a path to
removing the annoying config restriction and a better policy for sequence
overflow.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)