[ 
https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junyang Liu updated KAFKA-14189:
--------------------------------
    Description: 
The connection id of connection with coordinator in KafkaConsumer is 
Integer.MAX_VALUE - coordinator id, which is different with connection id of 
partition leader. So the connection cannot be reused when coordinator and 
leader are in the same broker, which means we need two seperated connections 
with the same broker. Suppose such case, a consumer has connected to the 
coordinator and finished Join and Sync, and wants to send FETCH to leader in 
the same broker. But the connection count has reached limit, so the consumer 
with be in the group but cannot consume messages

partial logs:
{code:java}
Added READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to 
node <ip>:9092 (id: 2 rack: 2) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 
partition(s). (org.apache.kafka.clients.FetchSessionHandler)
Sending READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 
(id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
Initiating connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> 
(org.apache.kafka.clients.NetworkClient)
Using older server API v3 to send OFFSET_COMMIT 
{group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
 with correlation id 242 to node 2147483645 
(org.apache.kafka.clients.NetworkClient)
Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to 
node 2 (org.apache.kafka.common.network.Selector)
Completed connection to node 2. Fetching API versions. 
(org.apache.kafka.clients.NetworkClient)
Initiating API versions fetch from node 2. 
(org.apache.kafka.clients.NetworkClient)
Subscribed to topic(s): topic-test 
(org.apache.kafka.clients.consumer.KafkaConsumer)
Connection with /<ip> disconnected (org.apache.kafka.common.network.Selector)
Node 2 disconnected. (org.apache.kafka.clients.NetworkClient) {code}
connection to coordinator, rebalance and fetching offsets have finished. when 
preparing connection to leader for fetching, the connection limit has reached, 
so after tcp connection, the broker disconnect the client.  

 

The root cause of this issue is that the process of consuming is a combination 
of multiple connections(connections with coordinator and leader in same 
broker), not atomic, which may leads to "half connected". I think we can do 
some improvement:
 # reuse the connection with coordinator and leader in the same broker
 # make the connection limit more flexible, such as allowing extra related 
connections of a consumer when the connection count limit has reached if it has 
connected to broker

  was:
The connection id of connection with coordinator in KafkaConsumer is 
Integer.MAX_VALUE - coordinator id, which is different with connection id of 
partition leader. So the connection cannot be reused when coordinator and 
leader are in the same broker, which means we need two seperated connections 
with the same broker. Suppose such case, a consumer has connected to the 
coordinator and finished Join and Sync, and wants to send FETCH to leader in 
the same broker. But the connection count has reached limit, so the consumer 
with be in the group but cannot consume messages

 

The root cause of this issue is that the process of consuming is a combination 
of multiple connections(connections with coordinator and leader in same 
broker), not atomic, which may leads to "half connected". I think we can do 
some improvement:
 # reuse the connection with coordinator and leader in the same broker
 # make the connection limit more flexible, such as allowing extra related 
connections of a consumer when the connection count limit has reached if it has 
connected to broker


> improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-14189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14189
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0
>            Reporter: Junyang Liu
>            Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is 
> Integer.MAX_VALUE - coordinator id, which is different with connection id of 
> partition leader. So the connection cannot be reused when coordinator and 
> leader are in the same broker, which means we need two seperated connections 
> with the same broker. Suppose such case, a consumer has connected to the 
> coordinator and finished Join and Sync, and wants to send FETCH to leader in 
> the same broker. But the connection count has reached limit, so the consumer 
> with be in the group but cannot consume messages
> partial logs:
> {code:java}
> Added READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 
> to node <ip>:9092 (id: 2 rack: 2) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 
> partition(s). (org.apache.kafka.clients.FetchSessionHandler)
> Sending READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 
> (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> Initiating connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> 
> (org.apache.kafka.clients.NetworkClient)
> Using older server API v3 to send OFFSET_COMMIT 
> {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
>  with correlation id 242 to node 2147483645 
> (org.apache.kafka.clients.NetworkClient)
> Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to 
> node 2 (org.apache.kafka.common.network.Selector)
> Completed connection to node 2. Fetching API versions. 
> (org.apache.kafka.clients.NetworkClient)
> Initiating API versions fetch from node 2. 
> (org.apache.kafka.clients.NetworkClient)
> Subscribed to topic(s): topic-test 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> Connection with /<ip> disconnected (org.apache.kafka.common.network.Selector)
> Node 2 disconnected. (org.apache.kafka.clients.NetworkClient) {code}
> connection to coordinator, rebalance and fetching offsets have finished. when 
> preparing connection to leader for fetching, the connection limit has 
> reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a 
> combination of multiple connections(connections with coordinator and leader 
> in same broker), not atomic, which may leads to "half connected". I think we 
> can do some improvement:
>  # reuse the connection with coordinator and leader in the same broker
>  # make the connection limit more flexible, such as allowing extra related 
> connections of a consumer when the connection count limit has reached if it 
> has connected to broker



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to