Question about Lag Calculations in Apache Kafka Source Code
Hi everyone, I am interested in understanding how the broker performs lag calculations. Specifically, I would like to explore the possibility of improving the calculation method to use the latest stable offset instead of the latest offset. I noticed that there might be differences between the results obtained from AdminClient.listOffsets and AdminClient.listConsumerGroupOffsets, and I believe investigating this area in the source code might shed some light on potential optimizations. Could you please guide me to the specific part of the Apache Kafka source code where the lag calculations are performed? I would greatly appreciate any insights or pointers you can provide to help me get started with my investigation. Thank you in advance for your assistance. Looking forward to hearing from you. Best regards, Henry
RE: Offsets: consumption and production in rollback
Hi Andrew, Yes, I have been using the requireStable option in the consumer group, but I consistently encounter the same issue. If I understand Kafka's logic correctly, it is essential for Kafka to retain those offsets even in the case of rollbacks. This is why relying on offsets within the application logic is not reliable. I need to explain this to my colleagues at work and would like to provide some context to the SpringKafka developer. Do you think the scenario of a rollback is similar to the effects of Log Compaction? https://kafka.apache.org/documentation/#design_compactionbasics Furthermore, is there a way to force the cleanup of transactions? Could this potentially help address the issue? Cordially, Henry De: Andrew Schofield Enviado: miércoles, 28 de junio de 2023 14:54 Para: dev@kafka.apache.org Asunto: Re: Offsets: consumption and production in rollback Hi Henry, Consumers get to choose an isolation level. There’s one instance I can think of where AdminClient also has some ability to let users choose how to deal with uncommitted data. If you’ve not seen KIP-851 take a look: https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-851%253A%2BAdd%2BrequireStable%2Bflag%2Binto%2BListConsumerGroupOffsetsOptions=05%7C01%7Chenry.galvez%40intm.fr%7C8aaeb5128f6c402edad408db77d6e2c7%7C73ed3e4cd13c4c27b5c27ea43a0b9720%7C0%7C0%7C638235537039134102%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=uJ591Hxxvp7%2BsEaYz3pR6EcEpFP4lG8Qo9AXMIstGs8%3D=0<https://cwiki.apache.org/confluence/display/KAFKA/KIP-851%3A+Add+requireStable+flag+into+ListConsumerGroupOffsetsOptions> By your question, I expect you have seen it. Control records are kind of invisibly lurking like dark matter. The approach I’ve taken with this kind of thing is to cope with the fact that the offsets of the real records are increasing but not necessarily consecutive. If I’m using READ_COMMITTED isolation level, there are also gaps when transactions roll back. I design my consumers so they are not surprised by the gaps and they don’t try to calculate the number of records. Instead, they just continually consume. Hope this helps, Andrew > On 28 Jun 2023, at 09:28, Henry GALVEZ wrote: > > Hi Andrew, > > Thank you for your response. > > I understand your explanation, but in both cases, I am using an "isolation > level" of READ_COMMITTED. I believe the issue lies in the > AdminClient.listOffsets method, as it may not be considering the isolation > level, where as the consumer of AdminClient.listConsumerGroupOffsets does > consider it. > > What are your thoughts on this? > > Additionally, would it be more suitable to implement a solution that reverts > the offsets in case of transaction rollbacks? It's possible that there's a > logic aspect I'm not fully grasping. > > Perhaps I need to utilize the internal control records and their offsets. > Could you point me in the right direction for their documentation? > > Thank you, > Henry > > > De: Andrew Schofield > Enviado: martes, 27 de junio de 2023 13:22 > Para: dev@kafka.apache.org > Asunto: Fwd: Offsets: consumption and production in rollback > > Hi Henry, > Thanks for your message. > > Kafka transactions are a bit unusual. If you produce a message inside a > transaction, it is assigned an offset on a topic-partition before > the transaction even commits. That offset is not “revoked” if the transaction > rolls back. > > This is why the consumer has the concept of “isolation level”. It essentially > controls whether the consumer can “see” the > uncommitted or even rolled back messages. > > A consumer using the committed isolation level only consumes committed > messages, but the offsets that it observes do > reflect the uncommitted messages. So, if you observe the progress of the > offsets of the records consumed, you see that they > skip the messages that were produced but then rolled back. There are also > invisible control records that are used to achieve > transactional behaviour, and those also have offsets. > > I’m not sure that this is really “bogus lag” but, when you’re using > transactions, there’s not a one-to-one relationship > between offsets and consumable records. > > Hope this helps, > Andrew > > Begin forwarded message: > > From: Henry GALVEZ > Subject: Offsets: consumption and production in rollback > Date: 27 June 2023 at 10:48:31 BST > To: "us...@kafka.apache.org" , "dev@kafka.apache.org" > > Reply-To: dev@kafka.apache.org > > I have some doubts regarding message consumption and production, as well as > transactional
RE: Offsets: consumption and production in rollback
Hi Andrew, Thank you for your response. I understand your explanation, but in both cases, I am using an "isolation level" of READ_COMMITTED. I believe the issue lies in the AdminClient.listOffsets method, as it may not be considering the isolation level, where as the consumer of AdminClient.listConsumerGroupOffsets does consider it. What are your thoughts on this? Additionally, would it be more suitable to implement a solution that reverts the offsets in case of transaction rollbacks? It's possible that there's a logic aspect I'm not fully grasping. Perhaps I need to utilize the internal control records and their offsets. Could you point me in the right direction for their documentation? Thank you, Henry De: Andrew Schofield Enviado: martes, 27 de junio de 2023 13:22 Para: dev@kafka.apache.org Asunto: Fwd: Offsets: consumption and production in rollback Hi Henry, Thanks for your message. Kafka transactions are a bit unusual. If you produce a message inside a transaction, it is assigned an offset on a topic-partition before the transaction even commits. That offset is not “revoked” if the transaction rolls back. This is why the consumer has the concept of “isolation level”. It essentially controls whether the consumer can “see” the uncommitted or even rolled back messages. A consumer using the committed isolation level only consumes committed messages, but the offsets that it observes do reflect the uncommitted messages. So, if you observe the progress of the offsets of the records consumed, you see that they skip the messages that were produced but then rolled back. There are also invisible control records that are used to achieve transactional behaviour, and those also have offsets. I’m not sure that this is really “bogus lag” but, when you’re using transactions, there’s not a one-to-one relationship between offsets and consumable records. Hope this helps, Andrew Begin forwarded message: From: Henry GALVEZ Subject: Offsets: consumption and production in rollback Date: 27 June 2023 at 10:48:31 BST To: "us...@kafka.apache.org" , "dev@kafka.apache.org" Reply-To: dev@kafka.apache.org I have some doubts regarding message consumption and production, as well as transactional capabilities. I am using a Kafka template to produce a message within a transaction. After that, I execute another transaction that produces a message and intentionally throws a runtime exception to simulate a transaction rollback. Next, I use the Kafka AdminClient to retrieve the latest offset for the topic partition and the consumer group's offsets for the same topic partition. However, when I compare the offset numbers, I notice a difference. In this example, the consumer has 4 offsets, while the topic has only 2. I have come across references to this issue in a Spring-Kafka report, specifically in the Kafka-10683 report, where developers describe it as either Bogus or Pseudo Lag. I am keen on resolving this problem, and I would greatly appreciate hearing about your experiences and knowledge regarding this matter. Thank you very much Henry
Offsets: consumption and production in rollback
I have some doubts regarding message consumption and production, as well as transactional capabilities. I am using a Kafka template to produce a message within a transaction. After that, I execute another transaction that produces a message and intentionally throws a runtime exception to simulate a transaction rollback. Next, I use the Kafka AdminClient to retrieve the latest offset for the topic partition and the consumer group's offsets for the same topic partition. However, when I compare the offset numbers, I notice a difference. In this example, the consumer has 4 offsets, while the topic has only 2. I have come across references to this issue in a Spring-Kafka report, specifically in the Kafka-10683 report, where developers describe it as either Bogus or Pseudo Lag. I am keen on resolving this problem, and I would greatly appreciate hearing about your experiences and knowledge regarding this matter. Thank you very much Henry