[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2021-05-20 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348711#comment-17348711
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

nudge

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9071) transactional.id.expiration.ms config value should be implemented as a Long

2021-05-20 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348710#comment-17348710
 ] 

Joost van de Wijgerd commented on KAFKA-9071:
-

nudge

> transactional.id.expiration.ms config value should be implemented as a Long
> ---
>
> Key: KAFKA-9071
> URL: https://issues.apache.org/jira/browse/KAFKA-9071
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.3.0
>Reporter: Joost van de Wijgerd
>Assignee: Mario Georgiev
>Priority: Major
>
> Currently the value of this config parameter is limited to MAX_INT 
> effectively limiting the transactional id expiration to  ~ 25 days. This is 
> causing some issues for us on our Acceptance environment (which is not used 
> that often / heavily) where our transactional services will start failing 
> because if this issue.
> I believe best practice for millisecond values should be to implement them as 
> a Long and not as an Integer
> this is currently the max value: transactional.id.expiration.ms=2147483647
> while I would like to set it to: transactional.id.expiration.ms=3154000 
> (i.e. 1 year)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-06-29 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147635#comment-17147635
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

Hi [~bchen225242] , 

I agree that we are not using the recommended usage pattern. The issue however 
is that the pattern we are using is actually fully functional (we have been 
using this in production for 9 months now) but due to the implementation detail 
of the TransactionManager that only supports one GroupCoordinator it keeps on 
'flipping' between group coordinators and due to the default retry timeout a 
100ms time penalty is incurred every time this happens (since we have 
discovered this issue we have set the timeout to 0ms). I have actually patched 
kafka-clients 2.5.0 with my fix and we are currently running this in production 
with no issues whatsoever. 

As to your point of the consumer groups rebalancing; correct me if I am wrong 
but I think this has no impact in the location of the ConsumerGroupCoordinator 
on the Broker. My fix is merely keeping track of which Broker hosts a given 
ConsumerGroupCoordinator so I don't see how this would be an issue.

I do agree with you that if you use the the many -> one mapping that you 
cannot/should not use the automatic rebalancing. We are indeed using our own 
assignment strategy because we want partitions of different Topics with the 
same ordinal to map to the same application instance. If we would let Kafka do 
the allocation I indeed think this pattern would not work correctly. 

I am sticking to my standpoint that implementing this improvement does not hurt 
the current recommended pattern at all but it does support the many to one 
pattern in a performant way. I don't think you have to update your 
documentation for this unless you want to specifically point this out to your 
users. 

If you decide to not implement this improvement I would opt to log a WARN 
message that alerts the developer to this issue so they can fix the problem in 
an early stage of development (currently there is an INFO message when a new 
ConsumerGroupCoordinator is found, this was my only clue to finding the problem 
and unfortunately this was after we implemented our framework around the many 
consumer > one producer concept)

To answer your question: implementing a proper switch to the one to one 
Consumer Producer mapping would be a big change for us, pairing extra Producers 
to our existing Consumers should be a lot easier but we would essentially be 
using them to implement the Map of ConsumerGroupCoordinators so for me it is 
then a better option to run with a patched kafka-clients library. However on 
the long run this is also not very sustainable. 

Best Regards,
Joost

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-13 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106190#comment-17106190
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

Hi [~guozhang] thanks for the information, Indeed I see that the SerDes pass in 
the topic so this can indeed be used to select the proper deserializer in that 
case. I guess the typing on the Consumer and ConsumerRecords interface would 
have to be Object though as I am now mixing diffent java class types.  

The other situation (b) is a bit more problematic for us. We have a constant 
stream of DomainEvents which are processed exactly once. If we would like to 
process an event from that stream (one that was already there but we weren't 
interested before) then rewinding the existing consumer would be hard to do 
(not impossible of course) and it would block processing of newer events until 
we have caught up again. In this scenario, adding a new consumer(group) would 
be much simpler to implement (at the expense of network traffic of course, but 
for us this is not an issue).

Arguably this could all be solved with a single consumer -> producer mapping 
but unfortunately this is not the route we took. Since we are processing 
financial transactions with this system I am very wary of rewriting our 
consumer logic as we would have to drop our current consumer groups and create 
a new one (for the whole service) and if we are not careful we might miss 
transactions . Another solution would be to pair a producer to each consumer 
but that will add extra overhead as well. 

Maybe our use case / implementation is a bit out of the ordinary but fact is 
that the kafka-clients library allows this construct and it also works. Except 
for the fact that a latency of around 105ms is introduced with the standards 
settings because the TransactionManager has to keep refreshing the 
groupCoordinator Node. So best practices notwithstanding I think it makes sense 
for the kafka-clients library to support many-to-one consumer -> producer 
mappings in a performant way. If you decide not to, then maybe add at least a 
warn log when this situation is detected inside the TransactionManager to alert 
developers they are doing something wrong that will hurt the performance of 
their app.

Obviously my preference is to have this fix in the kafka-clients library ;)

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9071) transactional.id.expiration.ms config value should be implemented as a Long

2020-05-12 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105584#comment-17105584
 ] 

Joost van de Wijgerd commented on KAFKA-9071:
-

Hi, what's the status on this? would be great if you would consider this PR for 
the next release

> transactional.id.expiration.ms config value should be implemented as a Long
> ---
>
> Key: KAFKA-9071
> URL: https://issues.apache.org/jira/browse/KAFKA-9071
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.3.0
>Reporter: Joost van de Wijgerd
>Assignee: Mario Georgiev
>Priority: Major
>
> Currently the value of this config parameter is limited to MAX_INT 
> effectively limiting the transactional id expiration to  ~ 25 days. This is 
> causing some issues for us on our Acceptance environment (which is not used 
> that often / heavily) where our transactional services will start failing 
> because if this issue.
> I believe best practice for millisecond values should be to implement them as 
> a Long and not as an Integer
> this is currently the max value: transactional.id.expiration.ms=2147483647
> while I would like to set it to: transactional.id.expiration.ms=3154000 
> (i.e. 1 year)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-12 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105573#comment-17105573
 ] 

Joost van de Wijgerd edited comment on KAFKA-9953 at 5/12/20, 4:38 PM:
---

[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework on top of kafka using 
transactions, so we have a couple of different ConsumerRecord value types we 
send around the system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

We partition our data on userId and have a significant number of partitions per 
topic to ensure we have sufficient throughput / scalabiltity

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.


was (Author: jwijgerd):
[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework on top of kafka using 
transactions, so we have a couple of different ConsumerRecord types we send 
around the system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

We partition our data on userId and have a significant number of partitions per 
topic to ensure we have sufficient throughput / scalabiltity

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> 

[jira] [Comment Edited] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-12 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105573#comment-17105573
 ] 

Joost van de Wijgerd edited comment on KAFKA-9953 at 5/12/20, 4:38 PM:
---

[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework on top of kafka using 
transactions, so we have a couple of different ConsumerRecord types we send 
around the system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

We partition our data on userId and have a significant number of partitions per 
topic to ensure we have sufficient throughput / scalabiltity

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.


was (Author: jwijgerd):
[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework in top of kafka using 
transactions, so we have a couple of ConsumerRecord types we send around the 
system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

We partition our data on userId and have a significant number of partitions per 
topic to ensure we have sufficient throughput / scalabiltity

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also 

[jira] [Comment Edited] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-12 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105573#comment-17105573
 ] 

Joost van de Wijgerd edited comment on KAFKA-9953 at 5/12/20, 4:37 PM:
---

[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework in top of kafka using 
transactions, so we have a couple of ConsumerRecord types we send around the 
system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

We partition our data on userId and have a significant number of partitions per 
topic to ensure we have sufficient throughput / scalabiltity

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.


was (Author: jwijgerd):
[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework in top of kafka using 
transactions, so we have a couple of ConsumerRecord types we send around the 
system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> 

[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-12 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105573#comment-17105573
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework in top of kafka using 
transactions, so we have a couple of ConsumerRecord types we send around the 
system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to 
create different components to handle them. Every component (CommandServer, 
CommandClient, DomainEventListener) have their own consumer(group)s. We then 
have one thread (and one KafkaConsumer linked to it) that polls all these 
consumers in sequence, starts a transaction if there is data and then commits 
it.

In our model we publish multiple DomainEvent types to the same topic (we have 
roughly 220 different DomainEvent types so creating a partitioned topic for 
each didn't seem the right solution) and we sometimes have the need to read a 
different event type in our service. In that case it makes sense to create a 
new consumer(group) on the same topic that filters out that particular event 
type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an 
organizational perspective, of course we didn't realize back then that the 
design of the kafka client library was one-to-one or one-to-many in terms of 
consumers vs producers. However there is nothing stopping anybody from taking 
this approach and also I think it is straightforward for the TransactionManager 
to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-11 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104137#comment-17104137
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

[~ijuma] I created a PR, the link is now available in Issue Links. cc 
[~bchen225242] [~hachikuji]

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-04 Thread Joost van de Wijgerd (Jira)
Joost van de Wijgerd created KAFKA-9953:
---

 Summary: support multiple consumerGroupCoordinators in 
TransactionManager
 Key: KAFKA-9953
 URL: https://issues.apache.org/jira/browse/KAFKA-9953
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.5.0
Reporter: Joost van de Wijgerd


We are using kafka with a transactional producer and have the following use 
case:

3 KafkaConsumers (each with their own ConsumerGroup) polled by the same thread 
and 1 transactional kafka producer. When we add the offsets to the transaction 
we run into the following problem: 

TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
can be that some consumerGroupCoordinators are on another node, now we 
constantly see the TransactionManager switching between nodes, this has 
overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
_FindCoordinatorRequest_.

Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause of 
100ms for every other transaction (depending on what KafkaConsumer triggered 
the transaction of course)

If the TransactionManager could keep track of coordinator nodes per 
consumerGroupId this problem would be solved. 

I have already a patch for this but still need to test it. Will add it to the 
ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9071) transactional.id.expiration.ms config value should be implemented as a Long

2019-10-21 Thread Joost van de Wijgerd (Jira)
Joost van de Wijgerd created KAFKA-9071:
---

 Summary: transactional.id.expiration.ms config value should be 
implemented as a Long
 Key: KAFKA-9071
 URL: https://issues.apache.org/jira/browse/KAFKA-9071
 Project: Kafka
  Issue Type: Improvement
  Components: config
Affects Versions: 2.3.0
Reporter: Joost van de Wijgerd


Currently the value of this config parameter is limited to MAX_INT effectively 
limiting the transactional id expiration to  ~ 25 days. This is causing some 
issues for us on our Acceptance environment (which is not used that often / 
heavily) where our transactional services will start failing because if this 
issue.

I believe best practice for millisecond values should be to implement them as a 
Long and not as an Integer

this is currently the max value: transactional.id.expiration.ms=2147483647

while I would like to set it to: transactional.id.expiration.ms=3154000 
(i.e. 1 year)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)