[
https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18020120#comment-18020120
]
Arpit Goyal commented on KAFKA-17182:
-------------------------------------
[~kirktrue] [~lianetm] I see we added wakeup call in handleFetchSuccess but
not in handleFetchFailure. Will it not stalled when failure happening during
the fetch ?
> Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer
> -----------------------------------------------------------------------
>
> Key: KAFKA-17182
> URL: https://issues.apache.org/jira/browse/KAFKA-17182
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.8.0
> Reporter: Kirk True
> Assignee: Kirk True
> Priority: Blocker
> Labels: consumer-threading-refactor, fetcher,
> kip-848-client-support
> Fix For: 4.0.0, 4.1.0
>
> Attachments: 1.png, 2.png, 3.png
>
>
> h1. Background
> {{Consumer}} implementations fetch data from the cluster and temporarily
> buffer it in memory until the user next calls {{{}Consumer.poll(){}}}. When a
> fetch request is being generated, partitions that already have buffered data
> are not included in the fetch request.
> The {{ClassicKafkaConsumer}} performs much of its fetch logic and network I/O
> in the application thread. On {{{}poll(){}}}, if there is any
> locally-buffered data, the {{ClassicKafkaConsumer}} does not fetch _any_ new
> data and simply returns the buffered data to the user from {{{}poll(){}}}.
> On the other hand, the {{AsyncKafkaConsumer}} consumer splits its logic and
> network I/O between two threads, which results in a potential race condition
> during fetch. The {{AsyncKafkaConsumer}} also checks for buffered data on its
> application thread. If it finds there is none, it signals the background
> thread to create a fetch request. However, it's possible for the background
> thread to receive data from a previous fetch and buffer it before the fetch
> request logic starts. When that occurs, as the background thread creates a
> new fetch request, it skips any buffered data, which has the unintended
> result that those partitions get added to the fetch request's "to remove"
> set. This signals to the broker to remove those partitions from its internal
> cache.
> This issue is technically possible in the {{ClassicKafkaConsumer}} too, since
> the heartbeat thread performs network I/O in addition to the application
> thread. However, because of the frequency at which the
> {{{}AsyncKafkaConsumer{}}}'s background thread runs, it is ~100x more likely
> to happen.
> h1. Options
> The core decision is: what should the background thread do if it is asked to
> create a fetch request and it discovers there's buffered data. There were
> multiple proposals to address this issue in the {{{}AsyncKafkaConsumer{}}}.
> Among them are:
> # The background thread should omit buffered partitions from the fetch
> request as before (this is the existing behavior)
> # The background thread should skip the fetch request generation entirely if
> there are _any_ buffered partitions
> # The background thread should include buffered partitions in the fetch
> request, but use a small “max bytes” value
> # The background thread should skip fetching from the nodes that have
> buffered partitions
> Option 3 won out. The change in {{AsyncKafkaConsumer}} is to include in the
> fetch request any partition with buffered data. By using a "max bytes" size
> of 1, this should cause the fetch response to return as little data as
> possible. In that way, the consumer doesn't buffer too much data on the
> client before it can be returned from {{{}poll(){}}}.
> h1. Testing
> h2. Eviction rate testing
> Here are the results of our internal stress testing:
> * {{{}ClassicKafkaConsumer{}}}—after the initial spike during test start up,
> the average rate settles down to ~0.14 evictions/second
> !1.png|width=833,height=300!
> * {{{}AsyncKafkaConsumer{}}}, (w/o fix)—after startup, the evictions still
> settle down, but they are ~100x higher than the {{ClassicKafkaConsumer}} at
> ~1.48 evictions/second !2.png|width=830,height=300!
> * {{AsyncKafkaConsumer}} (w/ fix)—the eviction rate is now closer to the
> {{ClassicKafkaConsumer}} at ~0.22 evictions/second
> !3.png|width=833,height=300!
> h2. {{EndToEndLatency}} testing
> The bundled {{EndToEndLatency}} test runner was executed on a single machine
> using Docker. The {{apache/kafka:latest}} Docker image was used and either
> the {{cluster/combined/plaintext/docker-compose.yml}} or
> {{single-node/plaintext/docker-compose.yml}} Docker Compose configuration
> files, depending on the test. The Docker containers were recreated from
> scratch before each test.
> A single topic was created with 30 partitions and with a replication factor
> of either 1 or 3, depending on a single- or multi-node setup.
> For each of the test runs these argument values were used:
> * Message count: 100000
> * {{{}acks{}}}: 1
> * Message size: 128 bytes
> A configuration file which contained a single configuration value of
> {{group.protocol=<$group_protocol>}} was also provided to the test, where
> {{$group_protocol}} was either {{CLASSIC}} or {{{}CONSUMER{}}}.
> h3. Test results
> h4. Test 1—{{{}CLASSIC{}}} group protocol, cluster size: 3 nodes, replication
> factor: 3
> ||Metric||{{trunk}}||PR||
> |Average latency|1.4901|1.4871|
> |50th percentile|1|1|
> |99th percentile|3|3|
> |99.9th percentile|6|6|
> h4. Test 2—{{{}CONSUMER{}}} group protocol, cluster size: 3 nodes,
> replication factor: 3
> ||Metric||{{trunk}}||PR||
> |Average latency|1.4704|1.4807|
> |50th percentile|1|1|
> |99th percentile|3|3|
> |99.9th percentile|6|7|
> h4. Test 3—{{{}CLASSIC{}}} group protocol, cluster size: 1 node, replication
> factor: 1
> ||Metric||{{trunk}}||PR||
> |Average latency|1.0777|1.0193|
> |50th percentile|1|1|
> |99th percentile|2|2|
> |99.9th percentile|5|4|
> h4. Test 4—{{{}CONSUMER{}}} group protocol, cluster size: 1 node, replication
> factor: 1
> ||Metric||{{trunk}}||PR||
> |Average latency|1.0937|1.0503|
> |50th percentile|1|1|
> |99th percentile|2|2|
> |99.9th percentile|4|4|
> h3. Conclusion
> These tests did not reveal any significant differences between the current
> fetcher logic on {{trunk}} and the one proposed in this PR. Addition test
> runs using larger message counts and/or larger message sizes did not affect
> the result.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)