Re: What role plays transactional.id after KIP-447?
I think "commitTransaction" should not throw CommitFailedException. Here admittedly we are overusing the term "commit" here, as we use it for two operations: committing the offsets (used for consumer, in either EOS or ALOS), and committing the transaction. The exception is meant for the former and would not be expected in `commitTransaction`. Guozhang On Thu, Jun 2, 2022 at 5:45 AM Gabriel Giussi wrote: > "I think we may overlooked it in documentation to emphasize that, in case > 1), it should not expect ProducerFencedException. If so, we can fix the > javadoc." > > IMHO that would be nice, I'm reviewing an existing codebase where we were > only handling ProducerFencedException, because the javadoc and the method > signature is explicit only about that, and CommitFailedException is not > even referenced but falls under the general KafkaException. > I think this could happen in both sendOffsetsToTransaction and > commitTransaction right? > > Thanks. > > El mar, 31 may 2022 a las 14:49, Guozhang Wang () > escribió: > > > The CommitFailedException should be expected, since the fencing happens > at > > the consumer coordinator. I.e. we can only fence the consumer-producer > pair > > by the consumer's generation, but we cannot do so since there's no other > > producer who has just grabbed the same txn.id and bumped the producer > > epoch. > > > > So to just clarify, when the zombie comes back, it could be fenced either > > when: > > > > 1) it tries to complete the ongoing transaction via `sendOffset`, in > which > > it would see the CommitFailedException. The caller is then responsible to > > handle the thrown exception that indicates being fenced. > > 2) it tries to heartbeat in the background thread, and got an > > InvalidGeneration error code, in which it would trigger the > > onPartitionsLost. The callback impl class is then responsible to handle > > that case which indicates being fenced. > > > > I think we may overlooked it in documentation to emphasize that, in case > > 1), it should not expect ProducerFencedException. If so, we can fix the > > javadoc. > > > > > > > > > > On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi > > wrote: > > > > > But there is no guarantee that the onPartitionsLost callback will be > > called > > > before a zombie producer coming back to life tries to continue with the > > > transaction, e.g. sending offsets or committing, so I should handle the > > > exception first and I could directly create a new producer there > instead > > of > > > doing in the callback. > > > The curious part for me is that I was able to reproduce a case that > > > simulates a zombie producer that will try to send offsets after a > > rebalance > > > but instead of failing with a ProducerFencedException is failing with a > > > CommitFailedException with this message "Transaction offset Commit > failed > > > due to consumer group metadata mismatch: Specified group generation id > is > > > not valid.", which makes sense but is not even documented in the > > > KafkaProducer#sendOffsetsToTransaction. > > > Is this the expected behaviour or it should fail with a > > > ProducerFencedException when the generation.id is outdated? > > > The case I reproduced is like this > > > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a > > > producer with transactional.id = "tid.123" > > > 2. Consumes message from partition 1 and sends it to another thread to > be > > > consumed (so the poll thread is not blocked) > > > 3. Producer A begins a transaction, sends to output topic and gets > > blocked > > > (I'm using a lock here to simulate a long processing) before calling > > > sendOffsetsToTransaction > > > 4. Consumer B is created and gets assigned partition 1 (I'm using > > > CooperativeStickyAssignor) and creates a producer with > transactional.id > > = > > > "tid.456" > > > 5. Consumer B fetches the same message, processes it and commits the > > > transaction successfully > > > 6. Producer A calls sendOffsetsToTransaction (because the lock was > > > released) and fails with CommitFailedException > > > > > > This behaviour reflects what is described here > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > , > > > but I was actually expecting a ProducerFencedException instead. Does > that > > > exception only correspond to fencing done by transactional.id? > > > > > > Thanks > > > > > > El mar, 24 may 2022 a las 20:30, Guozhang Wang () > > > escribió: > > > > > > > No problem. > > > > > > > > The key is that at step 4, when the consumer re-joins it will be > aware > > > that > > > > it has lost its previously assigned partitions and will trigger > > > > `onPartitionsLost` on the rebalance callback. And since in your > > scenario > > > > it's a 1-1 mapping from consumer to producer, it means the producer > has > > > > been fenced and hence should be closed. > > > > > > > > So in that step 4, the old producer with Clien
Re: What role plays transactional.id after KIP-447?
"I think we may overlooked it in documentation to emphasize that, in case 1), it should not expect ProducerFencedException. If so, we can fix the javadoc." IMHO that would be nice, I'm reviewing an existing codebase where we were only handling ProducerFencedException, because the javadoc and the method signature is explicit only about that, and CommitFailedException is not even referenced but falls under the general KafkaException. I think this could happen in both sendOffsetsToTransaction and commitTransaction right? Thanks. El mar, 31 may 2022 a las 14:49, Guozhang Wang () escribió: > The CommitFailedException should be expected, since the fencing happens at > the consumer coordinator. I.e. we can only fence the consumer-producer pair > by the consumer's generation, but we cannot do so since there's no other > producer who has just grabbed the same txn.id and bumped the producer > epoch. > > So to just clarify, when the zombie comes back, it could be fenced either > when: > > 1) it tries to complete the ongoing transaction via `sendOffset`, in which > it would see the CommitFailedException. The caller is then responsible to > handle the thrown exception that indicates being fenced. > 2) it tries to heartbeat in the background thread, and got an > InvalidGeneration error code, in which it would trigger the > onPartitionsLost. The callback impl class is then responsible to handle > that case which indicates being fenced. > > I think we may overlooked it in documentation to emphasize that, in case > 1), it should not expect ProducerFencedException. If so, we can fix the > javadoc. > > > > > On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi > wrote: > > > But there is no guarantee that the onPartitionsLost callback will be > called > > before a zombie producer coming back to life tries to continue with the > > transaction, e.g. sending offsets or committing, so I should handle the > > exception first and I could directly create a new producer there instead > of > > doing in the callback. > > The curious part for me is that I was able to reproduce a case that > > simulates a zombie producer that will try to send offsets after a > rebalance > > but instead of failing with a ProducerFencedException is failing with a > > CommitFailedException with this message "Transaction offset Commit failed > > due to consumer group metadata mismatch: Specified group generation id is > > not valid.", which makes sense but is not even documented in the > > KafkaProducer#sendOffsetsToTransaction. > > Is this the expected behaviour or it should fail with a > > ProducerFencedException when the generation.id is outdated? > > The case I reproduced is like this > > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a > > producer with transactional.id = "tid.123" > > 2. Consumes message from partition 1 and sends it to another thread to be > > consumed (so the poll thread is not blocked) > > 3. Producer A begins a transaction, sends to output topic and gets > blocked > > (I'm using a lock here to simulate a long processing) before calling > > sendOffsetsToTransaction > > 4. Consumer B is created and gets assigned partition 1 (I'm using > > CooperativeStickyAssignor) and creates a producer with transactional.id > = > > "tid.456" > > 5. Consumer B fetches the same message, processes it and commits the > > transaction successfully > > 6. Producer A calls sendOffsetsToTransaction (because the lock was > > released) and fails with CommitFailedException > > > > This behaviour reflects what is described here > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > , > > but I was actually expecting a ProducerFencedException instead. Does that > > exception only correspond to fencing done by transactional.id? > > > > Thanks > > > > El mar, 24 may 2022 a las 20:30, Guozhang Wang () > > escribió: > > > > > No problem. > > > > > > The key is that at step 4, when the consumer re-joins it will be aware > > that > > > it has lost its previously assigned partitions and will trigger > > > `onPartitionsLost` on the rebalance callback. And since in your > scenario > > > it's a 1-1 mapping from consumer to producer, it means the producer has > > > been fenced and hence should be closed. > > > > > > So in that step 4, the old producer with Client A should be closed > within > > > the rebalance callback, and then one can create a new producer to pair > > with > > > the re-joined consumer. > > > > > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi < > gabrielgiu...@gmail.com> > > > wrote: > > > > > > > Last question, the fencing occurs with the sendOffsetsToTransaction > > which > > > > includes ConsumerGroupMetadata, I guess the generation.id is what > > > matters > > > > here since it is bumped with each rebalance. > > > > But couldn't this happen? > > > > 1. Client A consumes from topic partition P1 with generation.id = 1 > > and > > > a > > > > producer associated to
Re: What role plays transactional.id after KIP-447?
The CommitFailedException should be expected, since the fencing happens at the consumer coordinator. I.e. we can only fence the consumer-producer pair by the consumer's generation, but we cannot do so since there's no other producer who has just grabbed the same txn.id and bumped the producer epoch. So to just clarify, when the zombie comes back, it could be fenced either when: 1) it tries to complete the ongoing transaction via `sendOffset`, in which it would see the CommitFailedException. The caller is then responsible to handle the thrown exception that indicates being fenced. 2) it tries to heartbeat in the background thread, and got an InvalidGeneration error code, in which it would trigger the onPartitionsLost. The callback impl class is then responsible to handle that case which indicates being fenced. I think we may overlooked it in documentation to emphasize that, in case 1), it should not expect ProducerFencedException. If so, we can fix the javadoc. On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi wrote: > But there is no guarantee that the onPartitionsLost callback will be called > before a zombie producer coming back to life tries to continue with the > transaction, e.g. sending offsets or committing, so I should handle the > exception first and I could directly create a new producer there instead of > doing in the callback. > The curious part for me is that I was able to reproduce a case that > simulates a zombie producer that will try to send offsets after a rebalance > but instead of failing with a ProducerFencedException is failing with a > CommitFailedException with this message "Transaction offset Commit failed > due to consumer group metadata mismatch: Specified group generation id is > not valid.", which makes sense but is not even documented in the > KafkaProducer#sendOffsetsToTransaction. > Is this the expected behaviour or it should fail with a > ProducerFencedException when the generation.id is outdated? > The case I reproduced is like this > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a > producer with transactional.id = "tid.123" > 2. Consumes message from partition 1 and sends it to another thread to be > consumed (so the poll thread is not blocked) > 3. Producer A begins a transaction, sends to output topic and gets blocked > (I'm using a lock here to simulate a long processing) before calling > sendOffsetsToTransaction > 4. Consumer B is created and gets assigned partition 1 (I'm using > CooperativeStickyAssignor) and creates a producer with transactional.id = > "tid.456" > 5. Consumer B fetches the same message, processes it and commits the > transaction successfully > 6. Producer A calls sendOffsetsToTransaction (because the lock was > released) and fails with CommitFailedException > > This behaviour reflects what is described here > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > , > but I was actually expecting a ProducerFencedException instead. Does that > exception only correspond to fencing done by transactional.id? > > Thanks > > El mar, 24 may 2022 a las 20:30, Guozhang Wang () > escribió: > > > No problem. > > > > The key is that at step 4, when the consumer re-joins it will be aware > that > > it has lost its previously assigned partitions and will trigger > > `onPartitionsLost` on the rebalance callback. And since in your scenario > > it's a 1-1 mapping from consumer to producer, it means the producer has > > been fenced and hence should be closed. > > > > So in that step 4, the old producer with Client A should be closed within > > the rebalance callback, and then one can create a new producer to pair > with > > the re-joined consumer. > > > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi > > wrote: > > > > > Last question, the fencing occurs with the sendOffsetsToTransaction > which > > > includes ConsumerGroupMetadata, I guess the generation.id is what > > matters > > > here since it is bumped with each rebalance. > > > But couldn't this happen? > > > 1. Client A consumes from topic partition P1 with generation.id = 1 > and > > a > > > producer associated to it produces to some output topic but a long GC > > pause > > > occurs before calling sendOffsetsToTransaction > > > 2. Client A gets out of sync and becomes a zombie due to session > timeout, > > > group rebalanced. > > > 3. Client B is assigned topic partition P1 with generation.id = 2, > calls > > > sendOffsetsToTransaction and commits the txn > > > 4. Client A is back online and joins again with generation.id = 3 > (this > > > happens in some internal thread) > > > 5. The thread that was about to call sendOffsetsToTransaction is > > scheduled > > > and calls sendOffsetsToTransaction with generation.id = 3 which is the > > > current one so it won't be fenced. > > > > > > I'm asking this because we are always asking the current > > > consumerGroupMetadata to the consumer object, not the one that was used > > to
Re: What role plays transactional.id after KIP-447?
But there is no guarantee that the onPartitionsLost callback will be called before a zombie producer coming back to life tries to continue with the transaction, e.g. sending offsets or committing, so I should handle the exception first and I could directly create a new producer there instead of doing in the callback. The curious part for me is that I was able to reproduce a case that simulates a zombie producer that will try to send offsets after a rebalance but instead of failing with a ProducerFencedException is failing with a CommitFailedException with this message "Transaction offset Commit failed due to consumer group metadata mismatch: Specified group generation id is not valid.", which makes sense but is not even documented in the KafkaProducer#sendOffsetsToTransaction. Is this the expected behaviour or it should fail with a ProducerFencedException when the generation.id is outdated? The case I reproduced is like this 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a producer with transactional.id = "tid.123" 2. Consumes message from partition 1 and sends it to another thread to be consumed (so the poll thread is not blocked) 3. Producer A begins a transaction, sends to output topic and gets blocked (I'm using a lock here to simulate a long processing) before calling sendOffsetsToTransaction 4. Consumer B is created and gets assigned partition 1 (I'm using CooperativeStickyAssignor) and creates a producer with transactional.id = "tid.456" 5. Consumer B fetches the same message, processes it and commits the transaction successfully 6. Producer A calls sendOffsetsToTransaction (because the lock was released) and fails with CommitFailedException This behaviour reflects what is described here https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification, but I was actually expecting a ProducerFencedException instead. Does that exception only correspond to fencing done by transactional.id? Thanks El mar, 24 may 2022 a las 20:30, Guozhang Wang () escribió: > No problem. > > The key is that at step 4, when the consumer re-joins it will be aware that > it has lost its previously assigned partitions and will trigger > `onPartitionsLost` on the rebalance callback. And since in your scenario > it's a 1-1 mapping from consumer to producer, it means the producer has > been fenced and hence should be closed. > > So in that step 4, the old producer with Client A should be closed within > the rebalance callback, and then one can create a new producer to pair with > the re-joined consumer. > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi > wrote: > > > Last question, the fencing occurs with the sendOffsetsToTransaction which > > includes ConsumerGroupMetadata, I guess the generation.id is what > matters > > here since it is bumped with each rebalance. > > But couldn't this happen? > > 1. Client A consumes from topic partition P1 with generation.id = 1 and > a > > producer associated to it produces to some output topic but a long GC > pause > > occurs before calling sendOffsetsToTransaction > > 2. Client A gets out of sync and becomes a zombie due to session timeout, > > group rebalanced. > > 3. Client B is assigned topic partition P1 with generation.id = 2, calls > > sendOffsetsToTransaction and commits the txn > > 4. Client A is back online and joins again with generation.id = 3 (this > > happens in some internal thread) > > 5. The thread that was about to call sendOffsetsToTransaction is > scheduled > > and calls sendOffsetsToTransaction with generation.id = 3 which is the > > current one so it won't be fenced. > > > > I'm asking this because we are always asking the current > > consumerGroupMetadata to the consumer object, not the one that was used > to > > consume the offsets, like this > > producer.sendOffsetsToTransaction(consumedOffsets, > > consumer.groupMetadata()); > > > > Couldn't this return a groupMetadata that has a valid generation.id even > > when it is not the same at the moment of consuming the messages that are > > about to be commited? > > > > I'm sure I'm missing something (probably in step 4) that makes this not a > > possible scenario, but I can't say what it is. > > > > Sorry if the question is too confusing. > > > > > > > > > > > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang () > > escribió: > > > > > Hi Gabriel, > > > > > > What I meant is that with KIP-447, the fencing is achieved by the time > of > > > committing with the consumer metadata. If within a transaction, the > > > producer would always try to commit at least once on behalf of the > > > consumer, AND a zombie of the producer would always come from a zombie > > of a > > > consumer, then the transaction would be guaranteed to be fenced. But: > > > > > > 1) If within a transaction, there's no `sendOffset..` triggered, then > > > fencing still need to be done by the txn coordinator, and txn.id plays > > the > > > role here I think this is not your
Re: What role plays transactional.id after KIP-447?
No problem. The key is that at step 4, when the consumer re-joins it will be aware that it has lost its previously assigned partitions and will trigger `onPartitionsLost` on the rebalance callback. And since in your scenario it's a 1-1 mapping from consumer to producer, it means the producer has been fenced and hence should be closed. So in that step 4, the old producer with Client A should be closed within the rebalance callback, and then one can create a new producer to pair with the re-joined consumer. On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi wrote: > Last question, the fencing occurs with the sendOffsetsToTransaction which > includes ConsumerGroupMetadata, I guess the generation.id is what matters > here since it is bumped with each rebalance. > But couldn't this happen? > 1. Client A consumes from topic partition P1 with generation.id = 1 and a > producer associated to it produces to some output topic but a long GC pause > occurs before calling sendOffsetsToTransaction > 2. Client A gets out of sync and becomes a zombie due to session timeout, > group rebalanced. > 3. Client B is assigned topic partition P1 with generation.id = 2, calls > sendOffsetsToTransaction and commits the txn > 4. Client A is back online and joins again with generation.id = 3 (this > happens in some internal thread) > 5. The thread that was about to call sendOffsetsToTransaction is scheduled > and calls sendOffsetsToTransaction with generation.id = 3 which is the > current one so it won't be fenced. > > I'm asking this because we are always asking the current > consumerGroupMetadata to the consumer object, not the one that was used to > consume the offsets, like this > producer.sendOffsetsToTransaction(consumedOffsets, > consumer.groupMetadata()); > > Couldn't this return a groupMetadata that has a valid generation.id even > when it is not the same at the moment of consuming the messages that are > about to be commited? > > I'm sure I'm missing something (probably in step 4) that makes this not a > possible scenario, but I can't say what it is. > > Sorry if the question is too confusing. > > > > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang () > escribió: > > > Hi Gabriel, > > > > What I meant is that with KIP-447, the fencing is achieved by the time of > > committing with the consumer metadata. If within a transaction, the > > producer would always try to commit at least once on behalf of the > > consumer, AND a zombie of the producer would always come from a zombie > of a > > consumer, then the transaction would be guaranteed to be fenced. But: > > > > 1) If within a transaction, there's no `sendOffset..` triggered, then > > fencing still need to be done by the txn coordinator, and txn.id plays > the > > role here I think this is not your scenario. > > 2) If a consumer may be "represented" by multiple producers, and a zombie > > producer does not come from a zombie consumer, then we still need the > > fencing be done via the txn.id --- this is the scenario I'd like to > remind > > you about. For example, if two producers could be (mistakenly) created > with > > different txn.ids and are paired with the same consumer, then the new API > > in KIP-447 would not fence one of them. > > > > Guozhang > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi > > wrote: > > > > > Hello Guozhang, > > > > > > thanks for the response, I have some doubts about the "N-1 > > > producer-consumer" case you mentioned and why I may need to configure > the > > > transactional id there and how. Is this a case of N consumers sharing > the > > > same producer right? > > > > > > My current implementation is creating a consumer per topic (I don't > > > subscribe to multiple topics from the same consumer) and starting a > > > producer per consumer, so the relation is 1 consumer/topic => 1 > producer > > > and the transactional id is set as > > --. > > > Do you see any problem with this configuration? > > > > > > Thanks again. > > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang () > > > escribió: > > > > > > > Hello Gabriel, > > > > > > > > What you're asking is a very fair question :) In fact, for Streams > > where > > > > the partition-assignment to producer-consumer pairs are purely > > flexible, > > > we > > > > think the new EOS would not have hard requirement on > transactional.id: > > > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > > > > > I you implemented the transactional messaging via a DIY > > producer+consumer > > > > though, it depends on how you'd expect the life-time of a producer, > > e.g. > > > if > > > > you do not have a 1-1 producer-consumer mapping then > transactional.id > > is > > > > not crucial, but if your have a N-1 producer-consumer mapping then > you > > > may > > > > still need to configure that id. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi < > > gabrielgiu...@gmail.com> > > > > wrote: > > > > > > > > > Before KI
Re: What role plays transactional.id after KIP-447?
Last question, the fencing occurs with the sendOffsetsToTransaction which includes ConsumerGroupMetadata, I guess the generation.id is what matters here since it is bumped with each rebalance. But couldn't this happen? 1. Client A consumes from topic partition P1 with generation.id = 1 and a producer associated to it produces to some output topic but a long GC pause occurs before calling sendOffsetsToTransaction 2. Client A gets out of sync and becomes a zombie due to session timeout, group rebalanced. 3. Client B is assigned topic partition P1 with generation.id = 2, calls sendOffsetsToTransaction and commits the txn 4. Client A is back online and joins again with generation.id = 3 (this happens in some internal thread) 5. The thread that was about to call sendOffsetsToTransaction is scheduled and calls sendOffsetsToTransaction with generation.id = 3 which is the current one so it won't be fenced. I'm asking this because we are always asking the current consumerGroupMetadata to the consumer object, not the one that was used to consume the offsets, like this producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); Couldn't this return a groupMetadata that has a valid generation.id even when it is not the same at the moment of consuming the messages that are about to be commited? I'm sure I'm missing something (probably in step 4) that makes this not a possible scenario, but I can't say what it is. Sorry if the question is too confusing. El mar, 24 may 2022 a las 12:49, Guozhang Wang () escribió: > Hi Gabriel, > > What I meant is that with KIP-447, the fencing is achieved by the time of > committing with the consumer metadata. If within a transaction, the > producer would always try to commit at least once on behalf of the > consumer, AND a zombie of the producer would always come from a zombie of a > consumer, then the transaction would be guaranteed to be fenced. But: > > 1) If within a transaction, there's no `sendOffset..` triggered, then > fencing still need to be done by the txn coordinator, and txn.id plays the > role here I think this is not your scenario. > 2) If a consumer may be "represented" by multiple producers, and a zombie > producer does not come from a zombie consumer, then we still need the > fencing be done via the txn.id --- this is the scenario I'd like to remind > you about. For example, if two producers could be (mistakenly) created with > different txn.ids and are paired with the same consumer, then the new API > in KIP-447 would not fence one of them. > > Guozhang > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi > wrote: > > > Hello Guozhang, > > > > thanks for the response, I have some doubts about the "N-1 > > producer-consumer" case you mentioned and why I may need to configure the > > transactional id there and how. Is this a case of N consumers sharing the > > same producer right? > > > > My current implementation is creating a consumer per topic (I don't > > subscribe to multiple topics from the same consumer) and starting a > > producer per consumer, so the relation is 1 consumer/topic => 1 producer > > and the transactional id is set as > --. > > Do you see any problem with this configuration? > > > > Thanks again. > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang () > > escribió: > > > > > Hello Gabriel, > > > > > > What you're asking is a very fair question :) In fact, for Streams > where > > > the partition-assignment to producer-consumer pairs are purely > flexible, > > we > > > think the new EOS would not have hard requirement on transactional.id: > > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > > > I you implemented the transactional messaging via a DIY > producer+consumer > > > though, it depends on how you'd expect the life-time of a producer, > e.g. > > if > > > you do not have a 1-1 producer-consumer mapping then transactional.id > is > > > not crucial, but if your have a N-1 producer-consumer mapping then you > > may > > > still need to configure that id. > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi < > gabrielgiu...@gmail.com> > > > wrote: > > > > > > > Before KIP-447 I understood the use of transactional.id to prevent > us > > > from > > > > zombies introducing duplicates, as explained in this talk > > > > https://youtu.be/j0l_zUhQaTc?t=822. > > > > So in order to get zombie fencing working correctly we should assign > > > > producers with a transactional.id that included the partition id, > > > > something > > > > like -, as shown in this slide > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the > > > same > > > > txnl.id A as the process 1 that crashed. > > > > This prevented us from having process 2 consuming the message again > and > > > > committing, while process 1 could come back to life and also commit > the > > > > pending transaction, hence having duplicates message being produced. > In > > > > this case process 1 wi
Re: What role plays transactional.id after KIP-447?
Hi Gabriel, What I meant is that with KIP-447, the fencing is achieved by the time of committing with the consumer metadata. If within a transaction, the producer would always try to commit at least once on behalf of the consumer, AND a zombie of the producer would always come from a zombie of a consumer, then the transaction would be guaranteed to be fenced. But: 1) If within a transaction, there's no `sendOffset..` triggered, then fencing still need to be done by the txn coordinator, and txn.id plays the role here I think this is not your scenario. 2) If a consumer may be "represented" by multiple producers, and a zombie producer does not come from a zombie consumer, then we still need the fencing be done via the txn.id --- this is the scenario I'd like to remind you about. For example, if two producers could be (mistakenly) created with different txn.ids and are paired with the same consumer, then the new API in KIP-447 would not fence one of them. Guozhang On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi wrote: > Hello Guozhang, > > thanks for the response, I have some doubts about the "N-1 > producer-consumer" case you mentioned and why I may need to configure the > transactional id there and how. Is this a case of N consumers sharing the > same producer right? > > My current implementation is creating a consumer per topic (I don't > subscribe to multiple topics from the same consumer) and starting a > producer per consumer, so the relation is 1 consumer/topic => 1 producer > and the transactional id is set as --. > Do you see any problem with this configuration? > > Thanks again. > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang () > escribió: > > > Hello Gabriel, > > > > What you're asking is a very fair question :) In fact, for Streams where > > the partition-assignment to producer-consumer pairs are purely flexible, > we > > think the new EOS would not have hard requirement on transactional.id: > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > I you implemented the transactional messaging via a DIY producer+consumer > > though, it depends on how you'd expect the life-time of a producer, e.g. > if > > you do not have a 1-1 producer-consumer mapping then transactional.id is > > not crucial, but if your have a N-1 producer-consumer mapping then you > may > > still need to configure that id. > > > > > > Guozhang > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi > > wrote: > > > > > Before KIP-447 I understood the use of transactional.id to prevent us > > from > > > zombies introducing duplicates, as explained in this talk > > > https://youtu.be/j0l_zUhQaTc?t=822. > > > So in order to get zombie fencing working correctly we should assign > > > producers with a transactional.id that included the partition id, > > > something > > > like -, as shown in this slide > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the > > same > > > txnl.id A as the process 1 that crashed. > > > This prevented us from having process 2 consuming the message again and > > > committing, while process 1 could come back to life and also commit the > > > pending transaction, hence having duplicates message being produced. In > > > this case process 1 will be fenced by having an outdated epoch. > > > > > > With KIP-447 we no longer have that potential scenario of two pending > > > transactions trying to produce and mark a message as committed, because > > we > > > won't let process 2 even start the transaction if there is a pending > one > > > (basically by not returning any messages since we reject the Offset > Fetch > > > if a there is a pending transaction for that offset partition). This is > > > explained in this post > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > > > > Having that, I don't see anymore the value of transactional.id or how > I > > > should configure it in my producers. The main benefit of KIP-447 is > that > > we > > > no longer have to start one producer per input partition, a quote from > > the > > > post > > > "The only way the static assignment requirement could be met is if each > > > input partition uses a separate producer instance, which is in fact > what > > > Kafka Streams previously relied on. However, this made running EOS > > > applications much more costly in terms of the client resources and load > > on > > > the brokers. A large number of client connections could heavily impact > > the > > > stability of brokers and become a waste of resources as well." > > > > > > I guess now I can reuse my producer between different input partitions, > > so > > > what transactional.id should I assign to it and why should I care, > isn't > > > zombie fencing resolved by rejecting offset fetch already? > > > > > > Thanks. > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
Re: What role plays transactional.id after KIP-447?
Hello Guozhang, thanks for the response, I have some doubts about the "N-1 producer-consumer" case you mentioned and why I may need to configure the transactional id there and how. Is this a case of N consumers sharing the same producer right? My current implementation is creating a consumer per topic (I don't subscribe to multiple topics from the same consumer) and starting a producer per consumer, so the relation is 1 consumer/topic => 1 producer and the transactional id is set as --. Do you see any problem with this configuration? Thanks again. El sáb, 21 may 2022 a las 16:37, Guozhang Wang () escribió: > Hello Gabriel, > > What you're asking is a very fair question :) In fact, for Streams where > the partition-assignment to producer-consumer pairs are purely flexible, we > think the new EOS would not have hard requirement on transactional.id: > https://issues.apache.org/jira/browse/KAFKA-9453 > > I you implemented the transactional messaging via a DIY producer+consumer > though, it depends on how you'd expect the life-time of a producer, e.g. if > you do not have a 1-1 producer-consumer mapping then transactional.id is > not crucial, but if your have a N-1 producer-consumer mapping then you may > still need to configure that id. > > > Guozhang > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi > wrote: > > > Before KIP-447 I understood the use of transactional.id to prevent us > from > > zombies introducing duplicates, as explained in this talk > > https://youtu.be/j0l_zUhQaTc?t=822. > > So in order to get zombie fencing working correctly we should assign > > producers with a transactional.id that included the partition id, > > something > > like -, as shown in this slide > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the > same > > txnl.id A as the process 1 that crashed. > > This prevented us from having process 2 consuming the message again and > > committing, while process 1 could come back to life and also commit the > > pending transaction, hence having duplicates message being produced. In > > this case process 1 will be fenced by having an outdated epoch. > > > > With KIP-447 we no longer have that potential scenario of two pending > > transactions trying to produce and mark a message as committed, because > we > > won't let process 2 even start the transaction if there is a pending one > > (basically by not returning any messages since we reject the Offset Fetch > > if a there is a pending transaction for that offset partition). This is > > explained in this post > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > > Having that, I don't see anymore the value of transactional.id or how I > > should configure it in my producers. The main benefit of KIP-447 is that > we > > no longer have to start one producer per input partition, a quote from > the > > post > > "The only way the static assignment requirement could be met is if each > > input partition uses a separate producer instance, which is in fact what > > Kafka Streams previously relied on. However, this made running EOS > > applications much more costly in terms of the client resources and load > on > > the brokers. A large number of client connections could heavily impact > the > > stability of brokers and become a waste of resources as well." > > > > I guess now I can reuse my producer between different input partitions, > so > > what transactional.id should I assign to it and why should I care, isn't > > zombie fencing resolved by rejecting offset fetch already? > > > > Thanks. > > > > > -- > -- Guozhang >
Re: What role plays transactional.id after KIP-447?
Hello Gabriel, What you're asking is a very fair question :) In fact, for Streams where the partition-assignment to producer-consumer pairs are purely flexible, we think the new EOS would not have hard requirement on transactional.id: https://issues.apache.org/jira/browse/KAFKA-9453 I you implemented the transactional messaging via a DIY producer+consumer though, it depends on how you'd expect the life-time of a producer, e.g. if you do not have a 1-1 producer-consumer mapping then transactional.id is not crucial, but if your have a N-1 producer-consumer mapping then you may still need to configure that id. Guozhang On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi wrote: > Before KIP-447 I understood the use of transactional.id to prevent us from > zombies introducing duplicates, as explained in this talk > https://youtu.be/j0l_zUhQaTc?t=822. > So in order to get zombie fencing working correctly we should assign > producers with a transactional.id that included the partition id, > something > like -, as shown in this slide > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the same > txnl.id A as the process 1 that crashed. > This prevented us from having process 2 consuming the message again and > committing, while process 1 could come back to life and also commit the > pending transaction, hence having duplicates message being produced. In > this case process 1 will be fenced by having an outdated epoch. > > With KIP-447 we no longer have that potential scenario of two pending > transactions trying to produce and mark a message as committed, because we > won't let process 2 even start the transaction if there is a pending one > (basically by not returning any messages since we reject the Offset Fetch > if a there is a pending transaction for that offset partition). This is > explained in this post > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > Having that, I don't see anymore the value of transactional.id or how I > should configure it in my producers. The main benefit of KIP-447 is that we > no longer have to start one producer per input partition, a quote from the > post > "The only way the static assignment requirement could be met is if each > input partition uses a separate producer instance, which is in fact what > Kafka Streams previously relied on. However, this made running EOS > applications much more costly in terms of the client resources and load on > the brokers. A large number of client connections could heavily impact the > stability of brokers and become a waste of resources as well." > > I guess now I can reuse my producer between different input partitions, so > what transactional.id should I assign to it and why should I care, isn't > zombie fencing resolved by rejecting offset fetch already? > > Thanks. > -- -- Guozhang