[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-07-19 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744877#comment-17744877
 ] 

Tom Lee commented on KAFKA-10337:
-

Thanks for pushing this through [~erikvanoosten], nice to see this finally 
land. :)

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Erik van Oosten
>Priority: Major
> Fix For: 3.6.0
>
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2021-03-15 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302143#comment-17302143
 ] 

Tom Lee commented on KAFKA-10518:
-

Attaching a contrived repro case. In case it's not obvious: in the event the 
repro case is run against a multi-broker cluster, key to the repro is that the 
partitions assigned to the consumer are being fetched from the same broker.

Sample output here: 
[https://gist.githubusercontent.com/thomaslee/fa13c9a10466dc35792173c2485ad84b/raw/34c02bfc9f756eced8b952530b1b6378760fd7cd/bug-repro-output
 
|https://gist.githubusercontent.com/thomaslee/fa13c9a10466dc35792173c2485ad84b/raw/34c02bfc9f756eced8b952530b1b6378760fd7cd/bug-repro-output]Note
 the throughput drop from a ballpark ~2-3M records/sec to less than 200k/sec. 
This is the point at which the _disable_topic_2_ file is created and the 
producer stops writing to topic_2.

Imagine a scenario where a consumer of topic_2 is downstream of another system 
producing to topic_2: if conditions are right, an incident impacting the 
producer could also impact the consumer. Same deal if the producer is decommed.

> Consumer fetches could be inefficient when lags are unbalanced
> --
>
> Key: KAFKA-10518
> URL: https://issues.apache.org/jira/browse/KAFKA-10518
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
> Attachments: kafka-slow-consumer-repro.tar.gz
>
>
> Consumer fetches are inefficient when lags are imbalanced across partitions, 
> due to head of the line blocking and the behavior of blocking for 
> `max.wait.ms` until data is available.
> When the consumer receives a fetch response, it prepares the next fetch 
> request and sends it out. The caveat is that the subsequent fetch request 
> would explicitly exclude partitions for which the consumer received data in 
> the previous round. This is to allow the consumer application to drain the 
> data for those partitions, until the consumer fetches the other partitions it 
> is subscribed to.
> This behavior does not play out too well if the consumer is consuming when 
> the lag is unbalanced, because it would receive data for the partitions it is 
> lagging on, and then it would send a fetch request for partitions that do not 
> have any data (or have little data). The latter will end up blocking for 
> fetch.max.wait.ms on the broker before an empty response is sent back. This 
> slows down the consumer’s overall consumption throughput since 
> fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2021-03-15 Thread Tom Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Lee updated KAFKA-10518:

Attachment: kafka-slow-consumer-repro.tar.gz

> Consumer fetches could be inefficient when lags are unbalanced
> --
>
> Key: KAFKA-10518
> URL: https://issues.apache.org/jira/browse/KAFKA-10518
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
> Attachments: kafka-slow-consumer-repro.tar.gz
>
>
> Consumer fetches are inefficient when lags are imbalanced across partitions, 
> due to head of the line blocking and the behavior of blocking for 
> `max.wait.ms` until data is available.
> When the consumer receives a fetch response, it prepares the next fetch 
> request and sends it out. The caveat is that the subsequent fetch request 
> would explicitly exclude partitions for which the consumer received data in 
> the previous round. This is to allow the consumer application to drain the 
> data for those partitions, until the consumer fetches the other partitions it 
> is subscribed to.
> This behavior does not play out too well if the consumer is consuming when 
> the lag is unbalanced, because it would receive data for the partitions it is 
> lagging on, and then it would send a fetch request for partitions that do not 
> have any data (or have little data). The latter will end up blocking for 
> fetch.max.wait.ms on the broker before an empty response is sent back. This 
> slows down the consumer’s overall consumption throughput since 
> fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2020-08-01 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169439#comment-17169439
 ] 

Tom Lee commented on KAFKA-10337:
-

Opened [https://github.com/apache/kafka/pull/9111] to address this particular 
edge case.

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Priority: Major
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2020-08-01 Thread Tom Lee (Jira)
Tom Lee created KAFKA-10337:
---

 Summary: Wait for pending async commits in commitSync() even if no 
offsets are specified
 Key: KAFKA-10337
 URL: https://issues.apache.org/jira/browse/KAFKA-10337
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Tom Lee


The JavaDoc for commitSync() states the following:
{quote}Note that asynchronous offset commits sent previously with the
{@link #commitAsync(OffsetCommitCallback)}
 (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
{quote}
But should we happen to call the method with an empty offset map
(i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
async commits will not be invoked because of an early return in 
ConsumerCoordinator.commitOffsetsSync() when the input map is empty.

If users are doing manual offset commits and relying on commitSync as a barrier 
for in-flight async commits prior to a rebalance, this could be an important 
(though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-25 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959918#comment-16959918
 ] 

Tom Lee commented on KAFKA-8950:


Yeah sounds very similar. If 2.3.1 doesn't help when it ships, I'd open a new 
ticket & attach a heap dump and a stack trace next time it gets stuck.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-25 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959813#comment-16959813
 ] 

Tom Lee commented on KAFKA-8950:


This specific bug was new in 2.3.0, yep. IIRC the map that would get into an 
inconsistent state and cause all the trouble was introduced in 2.3.0. Looking 
at 0.10.2.2 specifically, I think the code as implemented would have been fine.

If it's helpful, we've also had various issues with earlier versions of the 
client libs too but more often than not the issues were mitigated by config, 
upgrading client libs or system-level tuning: sysctls like tcp_retries2 & 
tcp_syn_retries are often set too high, misconfigured NICs can be an issue 
because of packet loss, stuff like that.

Request timeouts for consumers in ~0.10 were extremely high because of some 
sort of coupling with group rebalances and iirc this didn't get fixed until 
2.0.0. See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior.
 Producers had similar but different issues with certain configuration options. 
Some of this was more difficult to work around directly without the 2.3.x 
upgrade.

Not to say there are no more issues, but a custom build of 2.3.0 with Will's 
patch has been solid for us so far. By comparison, "vanilla" 2.3.0 would cause 
us trouble maybe once or twice a day.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-14 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951169#comment-16951169
 ] 

Tom Lee commented on KAFKA-8950:


Sounds good to me [~rsivaram], agree the fix seems trivial. To be clear, happy 
to submit a patch if Will's otherwise occupied -- just wanted to give him first 
right of refusal. :)

Separately: without knowing exactly what release schedules look like, is there 
any possibility of this making a 2.3 point release?

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-14 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951140#comment-16951140
 ] 

Tom Lee commented on KAFKA-8950:


[~rsivaram] hm new flag? Couldn't we just move the 
nodesWithPendingFetchRequests.put() above the call to client.send()? Maybe I'm 
misunderstanding.

I'd defer to [~wtjames] on the PR front, but this is really hurting us -- also 
happy to figure it out on our end if folks aren't likely to get this addressed 
soon.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-14 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951100#comment-16951100
 ] 

Tom Lee commented on KAFKA-8950:


Oh wait, the RequestFutureCompletionHandlers can be invoked by the heartbeat 
thread too, not just the consumer thread.  I think I'm catching up. :)

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-14 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951096#comment-16951096
 ] 

Tom Lee commented on KAFKA-8950:


[~rsivaram] plaintext transport here too. Not sure we could provide the full 
thread dump, but for now happy to answer specific questions if you have any 
hunches. Will see what I can figure out wrt the thread dump.

[~wtjames] ah I think I see what you're saying. So for consumer thread C and 
heartbeat/coordinator thread H:

C @ T=0: creates the RequestFuture, adds it to _unsent_ but does not quite get 
to the point where we add the listener
H @ T=1: somehow completes the future from T=0 (e.g. disconnect processing)
C @ T=2: adds the listener, which is immediately invoked on the calling thread 
and attempts to remove the nodesWithPendingFetchRequests entry before it has 
been added
C @ T=3: adds the nodesWithPendingFetchRequests entry, which will never be 
removed because the listener has already fired

Only thing I'm not seeing now is how the futures could actually get 
completed/failed directly on H ... from what I can see they'd typically be 
enqueued into _pendingCompletion_ by RequestFutureCompletionHandler & processed 
on the consumer thread rather than being called directly. It would only need to 
happen once, though. Very interesting, does seem precarious at the very least.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-13 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950707#comment-16950707
 ] 

Tom Lee commented on KAFKA-8950:


Ah wait I misunderstood what you said above: so not trouble with the listener 
execution itself, but the updates to the nodesWithPendingFetchRequests map. I 
don't think it's a problem because Fetcher.send() is synchronized, and both 
onSuccess/onFailure are protected by synchronized blocks. Even if the future 
fires first, send() still holds the lock.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-13 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950702#comment-16950702
 ] 

Tom Lee edited comment on KAFKA-8950 at 10/14/19 4:09 AM:
--

Hm not sure I see what you mean. I think you're right that checkDisconnects 
could be called by the coordinator/heartbeat thread at the same time send is 
invoked by the fetcher, so no disagreement there. But 
RequestFuture.addListener() will enqueue the listener in a 
ConcurrentLinkedQueue, then check if the future was previously succeeded or 
failed by checking an atomic reference before invoking fire\{Success,Failure\} 
which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither 
succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread 
will invoke the listener because the enqueue "happened-before" the atomic 
reference write. On the other hand if the atomic reference write 
"happened-before" we check if the future is succeeded or failed, addListener 
will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the 
complete()/raise() somewhere, but I'd be happy to be wrong if we could get it 
fixed. :)



was (Author: thomaslee):
Hm not sure I see what you mean. I think you're right that checkDisconnects 
could be called by the coordinator/heartbeat thread at the same time send is 
invoked by the fetcher (edit: or the other way around!), so no disagreement 
there. But RequestFuture.addListener() will enqueue the listener in a 
ConcurrentLinkedQueue, then check if the future was previously succeeded or 
failed by checking an atomic reference before invoking fire\{Success,Failure\} 
which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither 
succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread 
will invoke the listener because the enqueue "happened-before" the atomic 
reference write. On the other hand if the atomic reference write 
"happened-before" we check if the future is succeeded or failed, addListener 
will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the 
complete()/raise() somewhere, but I'd be happy to be wrong if we could get it 
fixed. :)


> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you 

[jira] [Comment Edited] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-13 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950702#comment-16950702
 ] 

Tom Lee edited comment on KAFKA-8950 at 10/14/19 4:08 AM:
--

Hm not sure I see what you mean. I think you're right that checkDisconnects 
could be called by the coordinator/heartbeat thread at the same time send is 
invoked by the fetcher (edit: or the other way around!), so no disagreement 
there. But RequestFuture.addListener() will enqueue the listener in a 
ConcurrentLinkedQueue, then check if the future was previously succeeded or 
failed by checking an atomic reference before invoking fire\{Success,Failure\} 
which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither 
succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread 
will invoke the listener because the enqueue "happened-before" the atomic 
reference write. On the other hand if the atomic reference write 
"happened-before" we check if the future is succeeded or failed, addListener 
will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the 
complete()/raise() somewhere, but I'd be happy to be wrong if we could get it 
fixed. :)



was (Author: thomaslee):
Hm not sure I see what you mean. I think you're right that checkDisconnects 
could be called by the coordinator/heartbeat thread at the same time send is 
invoked by the fetcher, so no disagreement there. But 
RequestFuture.addListener() will enqueue the listener in a 
ConcurrentLinkedQueue, then check if the future was previously succeeded or 
failed by checking an atomic reference before invoking fire{Success,Failure} 
which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither 
succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread 
will invoke the listener because the enqueue "happened-before" the atomic 
reference write. On the other hand if the atomic reference write 
"happened-before" we check if the future is succeeded or failed, addListener 
will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the 
complete()/raise() somewhere, but I'd be happy to be wrong if we could get it 
fixed. :)


> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have 

[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-13 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950702#comment-16950702
 ] 

Tom Lee commented on KAFKA-8950:


Hm not sure I see what you mean. I think you're right that checkDisconnects 
could be called by the coordinator/heartbeat thread at the same time send is 
invoked by the fetcher, so no disagreement there. But 
RequestFuture.addListener() will enqueue the listener in a 
ConcurrentLinkedQueue, then check if the future was previously succeeded or 
failed by checking an atomic reference before invoking fire{Success,Failure} 
which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither 
succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread 
will invoke the listener because the enqueue "happened-before" the atomic 
reference write. On the other hand if the atomic reference write 
"happened-before" we check if the future is succeeded or failed, addListener 
will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the 
complete()/raise() somewhere, but I'd be happy to be wrong if we could get it 
fixed. :)


> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-13 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950644#comment-16950644
 ] 

Tom Lee commented on KAFKA-8950:


Believe we're also running into this. High throughput topics/partitions seem to 
be more frequently impacted than others. Like [~wtjames] we saw 
_nodesWithPendingFetchRequests_ indicate that a request was in-flight and we 
saw poll() regularly return an empty record set at our configured timeout 
interval. We were able to dig a little deeper via a heap dump and see there 
were no requests actually in-flight according to 
ConsumerNetworkClient/NetworkClient/Selector, nor were there any indications of 
pending/completed/failed/aborted/unsent requests at the time of the heap dump. 
inFlightRequestsCount was zero. The state of the fetcher's 
nodesWithPendingFetchRequests map just seems to disagree with the underlying 
ConsumerNetworkClient/NetworkClient/Selector. The Fetcher believes a fetch 
response (or timeout) will eventually come and it never does.

Perhaps worst of all the heartbeat/coordinator thread continued sending 
heartbeats etc. as normal so no rebalances occurred and the "stuck" partition 
was not released by the consumer. The state of the leader's broker connection 
was still READY. No broker  connect/reconnect activity as best I can tell from 
metrics. No errors in the logs or exceptions thrown as far as I could see. 

Given how simple the synchronization around the nodesWithPendingFetchRequests 
map is, it's as though we're somehow failing to invoke [this request 
listener|https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L248-L249]
 or something like that, but the underlying networking code is tricky enough 
that it's difficult for me to speculate where. I've been eyeballing the 
_UnsentRequests_ data structure since it seems to be one of the few situations 
that we're manipulating state outside of coarse synchronized blocks in both the 
coordinator & consumer threads and it seems there's been bugs reported against 
it in the past, but so far I've come up empty.

I can imagine that an exception thrown at the wrong time/place could cause the 
RequestFuture listener not to be invoked, but I have no evidence of that 
actually happening.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)