[
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Denis Washington updated KAFKA-8635:
------------------------------------
Description:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious
long delays between records being produced by a stream task and the same
records being consumed by the next task. These delays turned out to always be
around {{retry.backoff.ms}} long; reducing that value reduced the delays by
about the same amount.
After digging further, I pinned down the problem to the following lines in
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:
{{} else if (transactionManager.hasInFlightTransactionalRequest() ||
maybeSendTransactionalRequest()) {}}
{{ // as long as there are outstanding transactional requests, we simply
wait for them to return}}}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}}}}}
{{ return;}}
{{}}}
This code seems to assume that, if {{maybeSendTransactionalRequest}} returns
true, a transactional request has been sent out that should be waited for.
However, this is not true if the request requires a coordinator lookup:
{{if (nextRequestHandler.needsCoordinator()) {}}
{{ targetNode =
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{ if (targetNode == null) {}}
{{ transactionManager.lookupCoordinator(nextRequestHandler); }}}}}}
{{ break;}}
{{ }}}
{{ ...}}
{{lookupCoordinator()}} does not actually send anything, but just enqueues a
coordinator lookup request for the {{Sender}}'s next run loop iteration.
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}}
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to
needlessly wait via {{client.poll()}} although there is actually no request
in-flight.
I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if
it merely enqueues the coordinator lookup instead of actually sending anything.
But I'm not sure, hence the bug report instead of a pull request.
was:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious
long delays between records being produced by a stream task and the same
records being consumed by the next task. These delays turned out to always be
around {{retry.backoff.ms}} long; reducing that value reduced the delays by
about the same amount.
After digging further, I pinned down the problem to the following lines in
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:
{{} else if (transactionManager.hasInFlightTransactionalRequest() ||
maybeSendTransactionalRequest()) {}}
{{ // as long as there are outstanding transactional requests, we simply
wait for them to return}}}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}}}}}
{{ return;}}
{{}}}
This code seems to assume that, if {{maybeSendTransactionalRequest}} returns
true, a transactional request has been sent out that should be waited for.
However, this is not true if the request requires a coordinator lookup:
{{if (nextRequestHandler.needsCoordinator()) {}}
{{ targetNode =
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{ if (targetNode == null) {}}
\{{ transactionManager.lookupCoordinator(nextRequestHandler); }}}}
\{{ break;}}
\{{ }}}
{{ ...}}
{{lookupCoordinator()}} does not actually send anything, but just enqueues a
coordinator lookup request for the {{Sender}}'s next run loop iteration.
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}}
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to
needlessly wait via {{client.poll()}} although there is actually no request
in-flight.
I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if
it merely enqueues the coordinator lookup instead of actually sending anything.
But I'm not sure, hence the bug report instead of a pull request.
> Unnecessary wait when looking up coordinator before transactional request
> -------------------------------------------------------------------------
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.3.0, 2.2.1
> Reporter: Denis Washington
> Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing
> mysterious long delays between records being produced by a stream task and
> the same records being consumed by the next task. These delays turned out to
> always be around {{retry.backoff.ms}} long; reducing that value reduced the
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in
> {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:
> {{} else if (transactionManager.hasInFlightTransactionalRequest() ||
> maybeSendTransactionalRequest()) {}}
> {{ // as long as there are outstanding transactional requests, we simply
> wait for them to return}}}}
> {{ client.poll(retryBackoffMs, time.milliseconds());}}}}}}
> {{ return;}}
> {{}}}
> This code seems to assume that, if {{maybeSendTransactionalRequest}} returns
> true, a transactional request has been sent out that should be waited for.
> However, this is not true if the request requires a coordinator lookup:
> {{if (nextRequestHandler.needsCoordinator()) {}}
> {{ targetNode =
> transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
> {{ if (targetNode == null) {}}
> {{ transactionManager.lookupCoordinator(nextRequestHandler); }}}}}}
> {{ break;}}
> {{ }}}
> {{ ...}}
> {{lookupCoordinator()}} does not actually send anything, but just enqueues a
> coordinator lookup request for the {{Sender}}'s next run loop iteration.
> {{maybeSendTransactionalRequest}} still returns true, though (the {{break}}
> jumps to a {{return true}} at the end of the method), leading the {{Sender}}
> to needlessly wait via {{client.poll()}} although there is actually no
> request in-flight.
> I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if
> it merely enqueues the coordinator lookup instead of actually sending
> anything. But I'm not sure, hence the bug report instead of a pull request.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)