[ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Washington updated KAFKA-8635:
------------------------------------
    Description: 
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
 {{     // as long as there are outstanding transactional requests, we simply 
wait for them to return}}}}
 {{     client.poll(retryBackoffMs, time.milliseconds());}}}}}}
 {{     return;}}
 {{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{     targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{     if (targetNode == null) {}}
{{          transactionManager.lookupCoordinator(nextRequestHandler); }}}}}}
{{          break;}}
{{    }}}
{{     ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.

  was:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{     // as long as there are outstanding transactional requests, we simply 
wait for them to return}}}}
{{     client.poll(retryBackoffMs, time.milliseconds());}}}}}}
{{     return;}}
{{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
 {{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
 {{    if (targetNode == null) {}}
 \{{         transactionManager.lookupCoordinator(nextRequestHandler); }}}}
 \{{         break;}}
 \{{    }}}
 {{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.


> Unnecessary wait when looking up coordinator before transactional request
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-8635
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8635
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.3.0, 2.2.1
>            Reporter: Denis Washington
>            Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:
> {{} else if (transactionManager.hasInFlightTransactionalRequest() || 
> maybeSendTransactionalRequest()) {}}
>  {{     // as long as there are outstanding transactional requests, we simply 
> wait for them to return}}}}
>  {{     client.poll(retryBackoffMs, time.milliseconds());}}}}}}
>  {{     return;}}
>  {{}}}
> This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
> true, a transactional request has been sent out that should be waited for. 
> However, this is not true if the request requires a coordinator lookup:
> {{if (nextRequestHandler.needsCoordinator()) {}}
> {{     targetNode = 
> transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
> {{     if (targetNode == null) {}}
> {{          transactionManager.lookupCoordinator(nextRequestHandler); }}}}}}
> {{          break;}}
> {{    }}}
> {{     ...}}
> {{lookupCoordinator()}} does not actually send anything, but just enqueues a 
> coordinator lookup request for the {{Sender}}'s next run loop iteration. 
> {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
> jumps to a {{return true}} at the end of the method), leading the {{Sender}} 
> to needlessly wait via {{client.poll()}} although there is actually no 
> request in-flight.
> I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
> it merely enqueues the coordinator lookup instead of actually sending 
> anything. But I'm not sure, hence the bug report instead of a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to