[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147635#comment-17147635 ] Joost van de Wijgerd commented on KAFKA-9953: - Hi [~bchen225242] , I agree that we are not using the recommended usage pattern. The issue however is that the pattern we are using is actually fully functional (we have been using this in production for 9 months now) but due to the implementation detail of the TransactionManager that only supports one GroupCoordinator it keeps on 'flipping' between group coordinators and due to the default retry timeout a 100ms time penalty is incurred every time this happens (since we have discovered this issue we have set the timeout to 0ms). I have actually patched kafka-clients 2.5.0 with my fix and we are currently running this in production with no issues whatsoever. As to your point of the consumer groups rebalancing; correct me if I am wrong but I think this has no impact in the location of the ConsumerGroupCoordinator on the Broker. My fix is merely keeping track of which Broker hosts a given ConsumerGroupCoordinator so I don't see how this would be an issue. I do agree with you that if you use the the many -> one mapping that you cannot/should not use the automatic rebalancing. We are indeed using our own assignment strategy because we want partitions of different Topics with the same ordinal to map to the same application instance. If we would let Kafka do the allocation I indeed think this pattern would not work correctly. I am sticking to my standpoint that implementing this improvement does not hurt the current recommended pattern at all but it does support the many to one pattern in a performant way. I don't think you have to update your documentation for this unless you want to specifically point this out to your users. If you decide to not implement this improvement I would opt to log a WARN message that alerts the developer to this issue so they can fix the problem in an early stage of development (currently there is an INFO message when a new ConsumerGroupCoordinator is found, this was my only clue to finding the problem and unfortunately this was after we implemented our framework around the many consumer > one producer concept) To answer your question: implementing a proper switch to the one to one Consumer Producer mapping would be a big change for us, pairing extra Producers to our existing Consumers should be a lot easier but we would essentially be using them to implement the Map of ConsumerGroupCoordinators so for me it is then a better option to run with a patched kafka-clients library. However on the long run this is also not very sustainable. Best Regards, Joost > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104059#comment-17104059 ] Ismael Juma commented on KAFKA-9953: [~jwijgerd] Can you please submit a pull request? cc [~bchen225242] [~hachikuji] > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104137#comment-17104137 ] Joost van de Wijgerd commented on KAFKA-9953: - [~ijuma] I created a PR, the link is now available in Issue Links. cc [~bchen225242] [~hachikuji] > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104893#comment-17104893 ] Guozhang Wang commented on KAFKA-9953: -- [~jwijgerd] I'm curious to learn your scenario, more specifically, what makes you needing three consumers instead of one consumer mapped to one producer? Is there any throughput limits or it is due to other reasons? > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105573#comment-17105573 ] Joost van de Wijgerd commented on KAFKA-9953: - [~guozhang] our scenario is the following: we have implemented a CQRS/EventSourcing framework in top of kafka using transactions, so we have a couple of ConsumerRecord types we send around the system: 1) Commands 2) CommandResponses 3) DomainEvents As all these types have different deserializers it seemed logical to us to create different components to handle them. Every component (CommandServer, CommandClient, DomainEventListener) have their own consumer(group)s. We then have one thread (and one KafkaConsumer linked to it) that polls all these consumers in sequence, starts a transaction if there is data and then commits it. In our model we publish multiple DomainEvent types to the same topic (we have roughly 220 different DomainEvent types so creating a partitioned topic for each didn't seem the right solution) and we sometimes have the need to read a different event type in our service. In that case it makes sense to create a new consumer(group) on the same topic that filters out that particular event type. If we would have had one consumer(group) this wouldn't have been possible. At the moment we designed this it seemed to make the most sense from an organizational perspective, of course we didn't realize back then that the design of the kafka client library was one-to-one or one-to-many in terms of consumers vs producers. However there is nothing stopping anybody from taking this approach and also I think it is straightforward for the TransactionManager to support this (as I showed in my PR) so why not support it? Anyway, I hope this makes sense and you will consider the PR. > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105613#comment-17105613 ] Guozhang Wang commented on KAFKA-9953: -- Thanks [~jwijgerd]. Just to clarify I'm not questioning the value of adding this support, and I think it is good to have such feature. I'm just curious to learn what's your motivation for having a many to one consumer -> producer mapping :) My understanding is that there seems to be two cases that requires more consumer groups: a) you have different formatted data representing different components and hence needs different deserialization, and b) for a single formatted data, it may need to be filtering based on certain fields to be used for. Here are some wild ideas on top of my head that you may consider. Of course, all of those are based on an assumption that with partitioned topics we may not saturate the network throughput before we overwhelm the broker yet: For a), it is okay to have a single consumer fetching from different topics since the serde's API can take a topic name as parameter so that you can specify different deser logic based on that, so you may have less than necessary consumer groups here. For b), there's an additional overhead beyond the number of consumers that you would send the same byte N times, each filtering on different field values and dropping others on the floor -- a bit wasteful :) An alternative approach would be, you still have a single consumer that reads everything of DomainEvent, and "defer" the filtering in a later stage, e.g. after getting the data put them into different buffers inside the client based on their types and then depending on which types your client is currently interested in, just polling from the corresponding buffers while dropping the other buffers. By doing so you only need to send the bytes 1 time. > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106190#comment-17106190 ] Joost van de Wijgerd commented on KAFKA-9953: - Hi [~guozhang] thanks for the information, Indeed I see that the SerDes pass in the topic so this can indeed be used to select the proper deserializer in that case. I guess the typing on the Consumer and ConsumerRecords interface would have to be Object though as I am now mixing diffent java class types. The other situation (b) is a bit more problematic for us. We have a constant stream of DomainEvents which are processed exactly once. If we would like to process an event from that stream (one that was already there but we weren't interested before) then rewinding the existing consumer would be hard to do (not impossible of course) and it would block processing of newer events until we have caught up again. In this scenario, adding a new consumer(group) would be much simpler to implement (at the expense of network traffic of course, but for us this is not an issue). Arguably this could all be solved with a single consumer -> producer mapping but unfortunately this is not the route we took. Since we are processing financial transactions with this system I am very wary of rewriting our consumer logic as we would have to drop our current consumer groups and create a new one (for the whole service) and if we are not careful we might miss transactions . Another solution would be to pair a producer to each consumer but that will add extra overhead as well. Maybe our use case / implementation is a bit out of the ordinary but fact is that the kafka-clients library allows this construct and it also works. Except for the fact that a latency of around 105ms is introduced with the standards settings because the TransactionManager has to keep refreshing the groupCoordinator Node. So best practices notwithstanding I think it makes sense for the kafka-clients library to support many-to-one consumer -> producer mappings in a performant way. If you decide not to, then maybe add at least a warn log when this situation is detected inside the TransactionManager to alert developers they are doing something wrong that will hurt the performance of their app. Obviously my preference is to have this fix in the kafka-clients library ;) > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17135011#comment-17135011 ] Boyang Chen commented on KAFKA-9953: Thanks for reporting the use case [~jwijgerd] . At a minimum, the EOS model has a recommended usage pattern that supports one -> one mapping from Consumer to the Producer. I have concern that by extending the logic, what's the impact to the new users when they approach writing EOS apps, should we also extend the example with many -> one mapping? Isn't the API giving the user a harder time to make it right, such as mixing the group.id or group metadata for different consumer groups unnecessarily. Also the consumer groups are rebalancing at separate time slots, which make the scenario even more complicated. As far my perception is that our discussed scenario here is not common, so would it be possible for you to just starting more producers to match consumers? How much impact would it be to your production system, in terms of stability and cost? > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager
[ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348711#comment-17348711 ] Joost van de Wijgerd commented on KAFKA-9953: - nudge > support multiple consumerGroupCoordinators in TransactionManager > > > Key: KAFKA-9953 > URL: https://issues.apache.org/jira/browse/KAFKA-9953 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Joost van de Wijgerd >Priority: Major > Attachments: KAFKA-9953.patch > > > We are using kafka with a transactional producer and have the following use > case: > 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same > thread and 1 transactional kafka producer. When we add the offsets to the > transaction we run into the following problem: > TransactionManager only keeps track of 1 consumerGroupCoordinator, however it > can be that some consumerGroupCoordinators are on another node, now we > constantly see the TransactionManager switching between nodes, this has > overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary > _FindCoordinatorRequest_. > Also with _retry.backoff.ms_ set to 100 by default this is causing a pause > of 100ms for every other transaction (depending on what KafkaConsumer > triggered the transaction of course) > If the TransactionManager could keep track of coordinator nodes per > consumerGroupId this problem would be solved. > I have already a patch for this but still need to test it. Will add it to the > ticket when that is done -- This message was sent by Atlassian Jira (v8.3.4#803005)