[
https://issues.apache.org/jira/browse/KAFKA-19804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18034215#comment-18034215
]
Kuan Po Tseng commented on KAFKA-19804:
---------------------------------------
Hi [~shivsundar],
Turns out the blocking issue does exist — I spoke too soon last time, sorry for
the confusion. Here’s what I found:
Let’s say there are already some records in the topic, and we call
{{poll(Duration.ofSeconds(10))}} with:
* consumer heartbeat interval = 30s
* broker heartbeat interval = 5s
* auto-commit interval = 5s
* auto.offset.reset = earliest
*Behavior:*
# {{subscribe}} + {{enable.auto.commit=true}} → first {{poll()}} blocks for
around 5s, then return records
# {{subscribe}} + {{enable.auto.commit=false}} → first {{poll()}} blocks for
around 10s (poll timeout), then return records
# {{assign}} → first {{poll()}} doesn’t block and we get records immediately
*Why it happens:*
In (1) and (2), during the first {{{}poll(){}}}, no Fetch request is send to
broker because we haven’t received any partition assignments yet, i.e. the
heartbeat request hasn’t returned. This causes {{pendingFetchRequestFuture}}
completed and set to null {^}[1]{^}{^}[2]{^}{^}[3]{^}, preventing
FetchRequestManager from issuing further fetches {^}[4]{^}, which in turn makes
the fetch buffer wait until it times out {^}[5]{^}.
* For (1), since auto-commit is on, the wait time is basically
{code:java}
Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs())
{code}
which ends up being the auto-commit interval (5s).
* For (2), it just uses the {{{}pollTimeout{}}}.
* For (3), since partitions are already assigned, {{AbstractFetch}} can start
fetching records right away, so there’s no blocking.
Now I’m looking into how to reduce or avoid this initial blocking on the first
{{poll()}} call. Any suggestions or feedback is welcome :)
I’ve also submitted a draft PR if you’d like to take a look.
[1]:
[https://github.com/apache/kafka/blob/5c49b48eb0732043400ec1ffb23022ebb3b47085/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L438]
[2]:
[https://github.com/apache/kafka/blob/9e424755d4d236442847b13863580f44f27e22a6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java#L162]
[3]:
[https://github.com/apache/kafka/blob/9e424755d4d236442847b13863580f44f27e22a6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java#L171]
[4]:
[https://github.com/apache/kafka/blob/9e424755d4d236442847b13863580f44f27e22a6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java#L142]
[5]:
[https://github.com/apache/kafka/blob/10f26c86297dd2770cd7c93e35b27d4c4ceb0e1c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1797]
> Improve heartbeat request manager initial HB interval
> ------------------------------------------------------
>
> Key: KAFKA-19804
> URL: https://issues.apache.org/jira/browse/KAFKA-19804
> Project: Kafka
> Issue Type: Task
> Components: clients, consumer
> Reporter: Lianet Magrans
> Assignee: Kuan Po Tseng
> Priority: Major
> Fix For: 4.2.0
>
>
> With KIP-848, consumer HB interval config moved to the broker, so currently,
> the consumer HB request manager starts with a 0ms interval (mainly to send a
> first HB right away after the consumer subscribe + poll). Once a response is
> received, the consumer takes the interval from the response and starts using
> it.
> That 0ms initial interval makes that the HB mgr poll continuously executes
> logic on a tight loop that may not really be needed. It mostly has to wait
> for a response (or a failure).
> Probably worse than this, is the impact on the app thread, given that
> pollTimeout takes into account the maxTimeToWait from the network thread,
> that is directly impacted by the timeToNextHeartbeat
> *
> [https://github.com/apache/kafka/blob/388739f5d847d7a16e389d9891f806547f023476/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1764-L1766]
> *
> [https://github.com/apache/kafka/blob/781bc7a54b8c4f7c86f0d6bb9ef8399d86d0735e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L255]
> We should review and consider setting a non-zero initial interval (while we
> wait for the actual interval from the broker). One option to consider would
> be using the request timeout maybe (just a first thought)
> High level goals here would be to:
> * maintain the behaviour of sending a first HB without delay
> * ensure no unneeded activity on the HB mgr poll in the background, in tight
> loop, while we're just waiting for the first HB response with an interval
> * ensure the app thread poll timeout is not affected
--
This message was sent by Atlassian Jira
(v8.20.10#820010)