Thanks for the KIP. I just have some clarification questions to make
sure I understand the proposal correctly:

1) "Safe Epoch Incrementing"

> When the coordinator receives a new InitProducerId request, we will use the 
> following logic to update the epoch:
> 
> 1. No epoch is provided: the current epoch will be bumped and the last epoch 
> will be set to -1.
> 2. Epoch and producerId are provided, and the provided producerId matches the 
> current producerId or the provided producerId matches the previous producerId 
> and the provided epoch is exhausted:
>       a. Provided epoch matches current epoch: the last epoch will be set to 
> the current epoch, and the current epoch will be bumped .
>       b. Provided epoch matches last epoch: the current epoch will be returned
>       c. Else: return INVALID_PRODUCER_EPOCH
> 3. Otherwise, return INVALID_PRODUCER_EPOCH

Case (1) would be for a new producer. Hence, should we state that "no
PID" is provided (instead of "no epoch" is provided?). That might be
clearer and it implies that there is no epoch anyway.

Case (2) would be for a producer that received `UNKNOWN_PRODUCER_ID`
error and tries to re-initialize itself.

Case (2a) implies that the producer send its first request and is not
fenced. Case (2b) implies that the producer re-tries to re-initialize
itself, ie, it first request to re-initilize did not get a respond but
was processed by the transaction coordinator. Case (2c) implies that a
producer was fenced (similar case 3, even if I am not sure what case 3
actually would be?)

Please let me know if my understanding is correct.

What is still unclear to me is, why case (2 -- or is it only 2b?)
requires that the "provide epoch is exhausted"?

For case 2b:

Assume a producer with PID=100 and epoch=MAX_EPOCH that gets an
`UNKNOWN_PRODUCER_ID`. It sends initPidRequest with the corresponding
PID/epoch pair. The TC processes the request and creates a new PID=101
with new epoch=0, however, the respond to the producer is lost. The TC
still stores `currentPid=101`, `currentEpoch=0` and `previousPid=100`,
`previoudEpoch=MAX_EPOCH`. When the producer re-tries, the sent
PID/epoch still matches the previous PID/epoch pair and hence the TC
know it's a retry?

If this reasoning is correct, should the logic be as follows:

1. No PID is provided: create a new PID with epoch=0 and set the last
epoch to -1.
2. Epoch and producerId are provided
   a) the provided producerId/epoch matches the current producerId/epoch:
      i) if the epoch is not exhausted, bump the epoch
      ii) if the epoch is exhausted, create a new PID with epoch=0
   b) the provided producerId/epoch matches the previous
producerId/epoch: respond with current PID/epoch
   c) Otherwise, return INVALID_PRODUCER_EPOCH



-Matthias




On 4/4/19 3:47 PM, Jason Gustafson wrote:
> Hi Everyone,
> 
> Sorry for the long delay on this KIP. I have updated it to include the
> handling of INVALID_PRODUCER_ID_MAPPING as suggested above. If there are no
> further comments, I will plan to start a vote early next week.
> 
> Thanks!
> Jason
> 
> On Mon, Mar 25, 2019 at 2:33 PM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
> 
>> Ach - Sorry. I meant Jason. I had just read a John Roesler email.
>>
>> On Mon, Mar 25, 2019 at 5:21 PM Adam Bellemare <adam.bellem...@gmail.com>
>> wrote:
>>
>>> Hi John
>>>
>>> What is the status of this KIP?
>>>
>>> My teammates and I are running into the "UNKNOWN_PRODUCER_ID" error on
>>> 2.1.1 for a multitude of our internal topics, and I suspect that a proper
>>> fix is needed.
>>>
>>> Adam
>>>
>>> On Mon, Jan 7, 2019 at 7:42 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>>
>>>> Thanks Jason. The proposed solution sounds good to me.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson <ja...@confluent.io>
>>>> wrote:
>>>>
>>>>> Hey Guozhang,
>>>>>
>>>>> Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
>>>>> occurs following expiration of the producerId. It's possible that
>>>> another
>>>>> producerId has been installed in its place following expiration (if
>>>> another
>>>>> producer instance has become active), or the mapping is empty. We can
>>>>> safely retry the InitProducerId with the logic in this KIP in order to
>>>>> detect which case it is. So I'd suggest something like this:
>>>>>
>>>>> 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
>>>>> InitProducerId using the current producerId and epoch.
>>>>> 2. If no mapping exists, the coordinator can generate a new producerId
>>>> and
>>>>> return it. If a transaction is in progress on the client, it will have
>>>> to
>>>>> be aborted, but the producer can continue afterwards.
>>>>> 3. Otherwise if a different producerId has been assigned, then we can
>>>>> return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
>>>>> probably raise this as ProducerFencedException since that is
>> effectively
>>>>> what has happened. Ideally this is the only fatal case that users have
>>>> to
>>>>> handle.
>>>>>
>>>>> I'll give it a little more thought and update the KIP.
>>>>>
>>>>> Thanks,
>>>>> Jason
>>>>>
>>>>> On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> You're right about the dangling txn since it will actually block
>>>>>> read-committed consumers from proceeding at all. I'd agree that
>> since
>>>>> this
>>>>>> is a very rare case, we can consider fixing it not via broker-side
>>>> logic
>>>>>> but via tooling in a future work.
>>>>>>
>>>>>> I've also discovered some related error handling logic inside
>> producer
>>>>> that
>>>>>> may be addressed together with this KIP (since it is mostly for
>>>> internal
>>>>>> implementations the wiki itself does not need to be modified):
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson <ja...@confluent.io
>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Guozhang,
>>>>>>>
>>>>>>> To clarify, the broker does not actually use the ApiVersion API
>> for
>>>>>>> inter-broker communications. The use of an API and its
>> corresponding
>>>>>>> version is controlled by `inter.broker.protocol.version`.
>>>>>>>
>>>>>>> Nevertheless, it sounds like we're on the same page about removing
>>>>>>> DescribeTransactionState. The impact of a dangling transaction is
>> a
>>>>>> little
>>>>>>> worse than what you describe though. Consumers with the
>>>> read_committed
>>>>>>> isolation level will be stuck. Still, I think we agree that this
>>>> case
>>>>>>> should be rare and we can reconsider for future work. Rather than
>>>>>>> preventing dangling transactions, perhaps we should consider
>> options
>>>>>> which
>>>>>>> allows us to detect them and recover. Anyway, this needs more
>>>> thought.
>>>>> I
>>>>>>> will update the KIP.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jason
>>>>>>>
>>>>>>> On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang <wangg...@gmail.com
>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> 0. My original question is about the implementation details
>>>>> primarily,
>>>>>>>> since current the handling logic of the APIVersionResponse is
>>>> simply
>>>>>> "use
>>>>>>>> the highest supported version of the corresponding request", but
>>>> if
>>>>> the
>>>>>>>> returned response from APIVersionRequest says "I don't even know
>>>>> about
>>>>>>> the
>>>>>>>> DescribeTransactionStateRequest at all", then we need additional
>>>>> logic
>>>>>>> for
>>>>>>>> the falling back logic. Currently this logic is embedded in
>>>>>> NetworkClient
>>>>>>>> which is shared by all clients, so I'd like to avoid making this
>>>>> logic
>>>>>>> more
>>>>>>>> complicated.
>>>>>>>>
>>>>>>>> As for the general issue that a broker does not recognize a
>>>> producer
>>>>>> with
>>>>>>>> sequence number 0, here's my thinking: as you mentioned in the
>>>> wiki,
>>>>>> this
>>>>>>>> is only a concern for transactional producer since for
>> idempotent
>>>>>>> producer
>>>>>>>> it can just bump the epoch and go. For transactional producer,
>>>> even
>>>>> if
>>>>>>> the
>>>>>>>> producer request from a fenced producer gets accepted, its
>>>>> transaction
>>>>>>> will
>>>>>>>> never be committed and hence messages not exposed to
>>>> read-committed
>>>>>>>> consumers as well. The drawback is though, 1) read-uncommitted
>>>>>> consumers
>>>>>>>> will still read those messages, 2) unnecessary storage for those
>>>>> fenced
>>>>>>>> produce messages, but in practice should not accumulate to a
>> large
>>>>>> amount
>>>>>>>> since producer should soon try to commit and be told it is
>> fenced
>>>> and
>>>>>>> then
>>>>>>>> stop, 3) there will be no markers for those transactional
>> messages
>>>>>> ever.
>>>>>>>> Looking at the list and thinking about the likelihood it may
>>>> happen
>>>>>>>> assuming we retain the producer up to transactional.id.timeout
>>>>> (default
>>>>>>> is
>>>>>>>> 7 days), I feel comfortable leaving it as is.
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Mon, Nov 26, 2018 at 6:09 PM Jason Gustafson <
>>>> ja...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Guozhang,
>>>>>>>>>
>>>>>>>>> Thanks for the comments. Responses below:
>>>>>>>>>
>>>>>>>>> 0. The new API is used between brokers, so we govern its usage
>>>>> using
>>>>>>>>> `inter.broker.protocol.version`. If the other broker hasn't
>>>>> upgraded,
>>>>>>> we
>>>>>>>>> will just fallback to the old logic, which is to accept the
>>>> write.
>>>>>> This
>>>>>>>> is
>>>>>>>>> similar to how we introduced the OffsetsForLeaderEpoch API.
>> Does
>>>>> that
>>>>>>>> seem
>>>>>>>>> reasonable?
>>>>>>>>>
>>>>>>>>> To tell the truth, after digging this KIP up and reading it
>>>> over, I
>>>>>> am
>>>>>>>>> doubting how crucial this API is. It is attempting to protect
>> a
>>>>> write
>>>>>>>> from
>>>>>>>>> a zombie which has just reset its sequence number after that
>>>>> producer
>>>>>>> had
>>>>>>>>> had its state cleaned up. However, one of the other
>>>> improvements in
>>>>>>> this
>>>>>>>>> KIP is to maintain producer state beyond its retention in the
>>>> log.
>>>>> I
>>>>>>>> think
>>>>>>>>> that makes this case sufficiently unlikely that we can leave
>> it
>>>> for
>>>>>>>> future
>>>>>>>>> work. I am not 100% sure this is the only scenario where
>>>>> transaction
>>>>>>>> state
>>>>>>>>> and log state can diverge anyway, so it would be better to
>>>> consider
>>>>>>> this
>>>>>>>>> problem more generally. What do you think?
>>>>>>>>>
>>>>>>>>> 1. Thanks, from memory, the term changed after the first
>>>> iteration.
>>>>>>> I'll
>>>>>>>>> make a pass and try to clarify usage.
>>>>>>>>> 2. I was attempting to handle some edge cases since this check
>>>>> would
>>>>>> be
>>>>>>>>> asynchronous. In any case, if we drop this validation as
>>>> suggested
>>>>>>> above,
>>>>>>>>> then we can ignore this.
>>>>>>>>>
>>>>>>>>> -Jason
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Nov 13, 2018 at 6:23 PM Guozhang Wang <
>>>> wangg...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Jason, thanks for the great write-up.
>>>>>>>>>>
>>>>>>>>>> 0. One question about the migration plan: "The new
>>>>>>> GetTransactionState
>>>>>>>>> API
>>>>>>>>>> and the new version of the transaction state message will
>> not
>>>> be
>>>>>> used
>>>>>>>>> until
>>>>>>>>>> the inter-broker version supports it." I'm not so clear
>> about
>>>> the
>>>>>>>>>> implementation details here: say a broker is on the newer
>>>> version
>>>>>> and
>>>>>>>> the
>>>>>>>>>> txn-coordinator is still on older version. Today the
>>>>>>> APIVersionsRequest
>>>>>>>>> can
>>>>>>>>>> only help upgrade / downgrade the request version, but not
>>>>>> forbidding
>>>>>>>>>> sending any. Are you suggesting we add additional logic on
>> the
>>>>>> broker
>>>>>>>>> side
>>>>>>>>>> to handle the case of "not sending the request"? If yes my
>>>>> concern
>>>>>> is
>>>>>>>>> that
>>>>>>>>>> this will be some tech-debt code that will live long before
>>>> being
>>>>>>>>> removed.
>>>>>>>>>>
>>>>>>>>>> Some additional minor comments:
>>>>>>>>>>
>>>>>>>>>> 1. "last epoch" and "instance epoch" seem to be referring to
>>>> the
>>>>>> same
>>>>>>>>> thing
>>>>>>>>>> in your wiki.
>>>>>>>>>> 2. "The broker must verify after receiving the response that
>>>> the
>>>>>>>> producer
>>>>>>>>>> state is still unknown.": not sure why we have to validate?
>> If
>>>>> the
>>>>>>>>> metadata
>>>>>>>>>> returned from the txn-coordinator can always be considered
>> the
>>>>>>>>>> source-of-truth, can't we just bindly use it to update the
>>>> cache?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Sep 6, 2018 at 9:10 PM Matthias J. Sax <
>>>>>>> matth...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I am +1 on this :)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 9/4/18 9:55 AM, Jason Gustafson wrote:
>>>>>>>>>>>> Bump. Thanks to Magnus for noticing that I forgot to
>> link
>>>> to
>>>>>> the
>>>>>>>> KIP:
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer
>>>>>>>>>>>> .
>>>>>>>>>>>>
>>>>>>>>>>>> -Jason
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Aug 21, 2018 at 4:37 PM, Jason Gustafson <
>>>>>>>> ja...@confluent.io
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a proposal to improve the
>> transactional/idempotent
>>>>>>>> producer's
>>>>>>>>>>>>> handling of the UNKNOWN_PRODUCER error, which is the
>>>> result
>>>>> of
>>>>>>>>> losing
>>>>>>>>>>>>> producer state following segment removal. The current
>>>>> behavior
>>>>>>> is
>>>>>>>>> both
>>>>>>>>>>>>> complex and limited. Please take a look and let me know
>>>> what
>>>>>> you
>>>>>>>>>> think.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks in advance to Matthias Sax for feedback on the
>>>>> initial
>>>>>>>> draft.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Jason
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to