Hey Guozhang,

Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
occurs following expiration of the producerId. It's possible that another
producerId has been installed in its place following expiration (if another
producer instance has become active), or the mapping is empty. We can
safely retry the InitProducerId with the logic in this KIP in order to
detect which case it is. So I'd suggest something like this:

1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
InitProducerId using the current producerId and epoch.
2. If no mapping exists, the coordinator can generate a new producerId and
return it. If a transaction is in progress on the client, it will have to
be aborted, but the producer can continue afterwards.
3. Otherwise if a different producerId has been assigned, then we can
return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
probably raise this as ProducerFencedException since that is effectively
what has happened. Ideally this is the only fatal case that users have to
handle.

I'll give it a little more thought and update the KIP.

Thanks,
Jason

On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang <wangg...@gmail.com> wrote:

> You're right about the dangling txn since it will actually block
> read-committed consumers from proceeding at all. I'd agree that since this
> is a very rare case, we can consider fixing it not via broker-side logic
> but via tooling in a future work.
>
> I've also discovered some related error handling logic inside producer that
> may be addressed together with this KIP (since it is mostly for internal
> implementations the wiki itself does not need to be modified):
>
>
> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
>
> Guozhang
>
>
>
> On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Guozhang,
> >
> > To clarify, the broker does not actually use the ApiVersion API for
> > inter-broker communications. The use of an API and its corresponding
> > version is controlled by `inter.broker.protocol.version`.
> >
> > Nevertheless, it sounds like we're on the same page about removing
> > DescribeTransactionState. The impact of a dangling transaction is a
> little
> > worse than what you describe though. Consumers with the read_committed
> > isolation level will be stuck. Still, I think we agree that this case
> > should be rare and we can reconsider for future work. Rather than
> > preventing dangling transactions, perhaps we should consider options
> which
> > allows us to detect them and recover. Anyway, this needs more thought. I
> > will update the KIP.
> >
> > Best,
> > Jason
> >
> > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > 0. My original question is about the implementation details primarily,
> > > since current the handling logic of the APIVersionResponse is simply
> "use
> > > the highest supported version of the corresponding request", but if the
> > > returned response from APIVersionRequest says "I don't even know about
> > the
> > > DescribeTransactionStateRequest at all", then we need additional logic
> > for
> > > the falling back logic. Currently this logic is embedded in
> NetworkClient
> > > which is shared by all clients, so I'd like to avoid making this logic
> > more
> > > complicated.
> > >
> > > As for the general issue that a broker does not recognize a producer
> with
> > > sequence number 0, here's my thinking: as you mentioned in the wiki,
> this
> > > is only a concern for transactional producer since for idempotent
> > producer
> > > it can just bump the epoch and go. For transactional producer, even if
> > the
> > > producer request from a fenced producer gets accepted, its transaction
> > will
> > > never be committed and hence messages not exposed to read-committed
> > > consumers as well. The drawback is though, 1) read-uncommitted
> consumers
> > > will still read those messages, 2) unnecessary storage for those fenced
> > > produce messages, but in practice should not accumulate to a large
> amount
> > > since producer should soon try to commit and be told it is fenced and
> > then
> > > stop, 3) there will be no markers for those transactional messages
> ever.
> > > Looking at the list and thinking about the likelihood it may happen
> > > assuming we retain the producer up to transactional.id.timeout (default
> > is
> > > 7 days), I feel comfortable leaving it as is.
> > >
> > > Guozhang
> > >
> > > On Mon, Nov 26, 2018 at 6:09 PM Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > Thanks for the comments. Responses below:
> > > >
> > > > 0. The new API is used between brokers, so we govern its usage using
> > > > `inter.broker.protocol.version`. If the other broker hasn't upgraded,
> > we
> > > > will just fallback to the old logic, which is to accept the write.
> This
> > > is
> > > > similar to how we introduced the OffsetsForLeaderEpoch API. Does that
> > > seem
> > > > reasonable?
> > > >
> > > > To tell the truth, after digging this KIP up and reading it over, I
> am
> > > > doubting how crucial this API is. It is attempting to protect a write
> > > from
> > > > a zombie which has just reset its sequence number after that producer
> > had
> > > > had its state cleaned up. However, one of the other improvements in
> > this
> > > > KIP is to maintain producer state beyond its retention in the log. I
> > > think
> > > > that makes this case sufficiently unlikely that we can leave it for
> > > future
> > > > work. I am not 100% sure this is the only scenario where transaction
> > > state
> > > > and log state can diverge anyway, so it would be better to consider
> > this
> > > > problem more generally. What do you think?
> > > >
> > > > 1. Thanks, from memory, the term changed after the first iteration.
> > I'll
> > > > make a pass and try to clarify usage.
> > > > 2. I was attempting to handle some edge cases since this check would
> be
> > > > asynchronous. In any case, if we drop this validation as suggested
> > above,
> > > > then we can ignore this.
> > > >
> > > > -Jason
> > > >
> > > >
> > > >
> > > > On Tue, Nov 13, 2018 at 6:23 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Jason, thanks for the great write-up.
> > > > >
> > > > > 0. One question about the migration plan: "The new
> > GetTransactionState
> > > > API
> > > > > and the new version of the transaction state message will not be
> used
> > > > until
> > > > > the inter-broker version supports it." I'm not so clear about the
> > > > > implementation details here: say a broker is on the newer version
> and
> > > the
> > > > > txn-coordinator is still on older version. Today the
> > APIVersionsRequest
> > > > can
> > > > > only help upgrade / downgrade the request version, but not
> forbidding
> > > > > sending any. Are you suggesting we add additional logic on the
> broker
> > > > side
> > > > > to handle the case of "not sending the request"? If yes my concern
> is
> > > > that
> > > > > this will be some tech-debt code that will live long before being
> > > > removed.
> > > > >
> > > > > Some additional minor comments:
> > > > >
> > > > > 1. "last epoch" and "instance epoch" seem to be referring to the
> same
> > > > thing
> > > > > in your wiki.
> > > > > 2. "The broker must verify after receiving the response that the
> > > producer
> > > > > state is still unknown.": not sure why we have to validate? If the
> > > > metadata
> > > > > returned from the txn-coordinator can always be considered the
> > > > > source-of-truth, can't we just bindly use it to update the cache?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Sep 6, 2018 at 9:10 PM Matthias J. Sax <
> > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > I am +1 on this :)
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 9/4/18 9:55 AM, Jason Gustafson wrote:
> > > > > > > Bump. Thanks to Magnus for noticing that I forgot to link to
> the
> > > KIP:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer
> > > > > > > .
> > > > > > >
> > > > > > > -Jason
> > > > > > >
> > > > > > > On Tue, Aug 21, 2018 at 4:37 PM, Jason Gustafson <
> > > ja...@confluent.io
> > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi All,
> > > > > > >>
> > > > > > >> I have a proposal to improve the transactional/idempotent
> > > producer's
> > > > > > >> handling of the UNKNOWN_PRODUCER error, which is the result of
> > > > losing
> > > > > > >> producer state following segment removal. The current behavior
> > is
> > > > both
> > > > > > >> complex and limited. Please take a look and let me know what
> you
> > > > > think.
> > > > > > >>
> > > > > > >> Thanks in advance to Matthias Sax for feedback on the initial
> > > draft.
> > > > > > >>
> > > > > > >> -Jason
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to