[ https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Junyang Liu updated KAFKA-14189: -------------------------------- Summary: Improve connection limit and reuse of coordinator and leader in KafkaConsumer (was: improve connection limit and reuse of coordinator and leader in KafkaConsumer) > Improve connection limit and reuse of coordinator and leader in KafkaConsumer > ----------------------------------------------------------------------------- > > Key: KAFKA-14189 > URL: https://issues.apache.org/jira/browse/KAFKA-14189 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 0.9.0.0 > Reporter: Junyang Liu > Priority: Major > > The connection id of connection with coordinator in KafkaConsumer is > Integer.MAX_VALUE - coordinator id, which is different with connection id of > partition leader. So the connection cannot be reused when coordinator and > leader are in the same broker, which means we need two seperated connections > with the same broker. Suppose such case, a consumer has connected to the > coordinator and finished Join and Sync, and wants to send FETCH to leader in > the same broker. But the connection count has reached limit, so the consumer > with be in the group but cannot consume messages > partial logs: > {code:java} > Added READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 > to node <ip>:9092 (id: 2 rack: 2) > (org.apache.kafka.clients.consumer.internals.Fetcher) > Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 > partition(s). (org.apache.kafka.clients.FetchSessionHandler) > Sending READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 > (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher) > Initiating connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> > (org.apache.kafka.clients.NetworkClient) > Using older server API v3 to send OFFSET_COMMIT > {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]} > with correlation id 242 to node 2147483645 > (org.apache.kafka.clients.NetworkClient) > Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to > node 2 (org.apache.kafka.common.network.Selector) > Completed connection to node 2. Fetching API versions. > (org.apache.kafka.clients.NetworkClient) > Initiating API versions fetch from node 2. > (org.apache.kafka.clients.NetworkClient) > Subscribed to topic(s): topic-test > (org.apache.kafka.clients.consumer.KafkaConsumer) > Connection with /<ip> disconnected (org.apache.kafka.common.network.Selector) > Node 2 disconnected. (org.apache.kafka.clients.NetworkClient) {code} > connection to coordinator, rebalance and fetching offsets have finished. when > preparing connection to leader for fetching, the connection limit has > reached, so after tcp connection, the broker disconnect the client. > > The root cause of this issue is that the process of consuming is a > combination of multiple connections(connections with coordinator and leader > in same broker), not atomic, which may leads to "half connected". I think we > can do some improvement: > # reuse the connection with coordinator and leader in the same broker > # make the connection limit more flexible, such as allowing extra related > connections of a consumer when the connection count limit has reached if it > has connected to broker -- This message was sent by Atlassian Jira (v8.20.10#820010)