[ 
https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junyang Liu updated KAFKA-14189:
--------------------------------
    Description: 
The connection id of connection with coordinator in KafkaConsumer is 
Integer.MAX_VALUE - coordinator id, which is different with connection id of 
partition leader. So the connection cannot be reused when coordinator and 
leader are in the same broker, which means we need two seperated connections 
with the same broker. Suppose such case, a consumer has connected to the 
coordinator and finished Join and Sync, and wants to send FETCH to leader in 
the same broker. But the connection count has reached limit, so the consumer 
with be in the group but cannot consume messages

partial logs:
{code:java}
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added 
READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node 
<ip>:9092 (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch 
(sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). 
(org.apache.kafka.clients.FetchSessionHandler) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending 
READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 (id: 2 
rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating connection 
to node <ip>:9092 (id: 2 rack: 2) using address /<ip> 
(org.apache.kafka.clients.NetworkClient) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server 
API v3 to send OFFSET_COMMIT 
{group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
 with correlation id 242 to node 2147483645 
(org.apache.kafka.clients.NetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with 
SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 
(org.apache.kafka.common.network.Selector)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed connection 
to node 2. Fetching API versions. (org.apache.kafka.clients.NetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API 
versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to 
topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with /<ip> 
disconnected (org.apache.kafka.common.network.Selector)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 disconnected. 
(org.apache.kafka.clients.NetworkClient) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request 
with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, 
correlationId=241) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch 
request (sessionId=INVALID, epoch=INITIAL) to node 2: 
org.apache.kafka.common.errors.DisconnectException. 
(org.apache.kafka.clients.FetchSessionHandler){code}
connection to coordinator, rebalance and fetching offsets have finished. when 
preparing connection to leader for fetching, the connection limit has reached, 
so after tcp connection, the broker disconnect the client.  

 

The root cause of this issue is that the process of consuming is a combination 
of multiple connections(connections with coordinator and leader in same 
broker), not atomic, which may leads to "half connected". I think we can do 
some improvement:
 # reuse the connection with coordinator and leader in the same broker, to 
avoid such "half connection" in KafkaConsumer when connection count limit 
reached
 # make the connection limit more flexible, such as allowing extra related 
connections of a consumer when the connection count limit has reached if it has 
connected to broker

  was:
The connection id of connection with coordinator in KafkaConsumer is 
Integer.MAX_VALUE - coordinator id, which is different with connection id of 
partition leader. So the connection cannot be reused when coordinator and 
leader are in the same broker, which means we need two seperated connections 
with the same broker. Suppose such case, a consumer has connected to the 
coordinator and finished Join and Sync, and wants to send FETCH to leader in 
the same broker. But the connection count has reached limit, so the consumer 
with be in the group but cannot consume messages

partial logs:
{code:java}
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added 
READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node 
<ip>:9092 (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch 
(sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). 
(org.apache.kafka.clients.FetchSessionHandler) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending 
READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 (id: 2 
rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating connection 
to node <ip>:9092 (id: 2 rack: 2) using address /<ip> 
(org.apache.kafka.clients.NetworkClient) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server 
API v3 to send OFFSET_COMMIT 
{group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
 with correlation id 242 to node 2147483645 
(org.apache.kafka.clients.NetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with 
SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 
(org.apache.kafka.common.network.Selector)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed connection 
to node 2. Fetching API versions. (org.apache.kafka.clients.NetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API 
versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to 
topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with /<ip> 
disconnected (org.apache.kafka.common.network.Selector)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 disconnected. 
(org.apache.kafka.clients.NetworkClient) 
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request 
with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, 
correlationId=241) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch 
request (sessionId=INVALID, epoch=INITIAL) to node 2: 
org.apache.kafka.common.errors.DisconnectException. 
(org.apache.kafka.clients.FetchSessionHandler){code}
connection to coordinator, rebalance and fetching offsets have finished. when 
preparing connection to leader for fetching, the connection limit has reached, 
so after tcp connection, the broker disconnect the client.  

 

The root cause of this issue is that the process of consuming is a combination 
of multiple connections(connections with coordinator and leader in same 
broker), not atomic, which may leads to "half connected". I think we can do 
some improvement:
 # reuse the connection with coordinator and leader in the same broker, to 
avoid such "half connection" in KafkaConsumer when connection count limit 
reached
 # make the connection limit more flexible, such as allowing extra related 
connections of a consumer when the connection count limit has reached if it has 
connected to broker


> Improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-14189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14189
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0
>            Reporter: Junyang Liu
>            Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is 
> Integer.MAX_VALUE - coordinator id, which is different with connection id of 
> partition leader. So the connection cannot be reused when coordinator and 
> leader are in the same broker, which means we need two seperated connections 
> with the same broker. Suppose such case, a consumer has connected to the 
> coordinator and finished Join and Sync, and wants to send FETCH to leader in 
> the same broker. But the connection count has reached limit, so the consumer 
> with be in the group but cannot consume messages
> partial logs:
> {code:java}
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added 
> READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node 
> <ip>:9092 (id: 2 rack: 2) 
> (org.apache.kafka.clients.consumer.internals.Fetcher) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch 
> (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). 
> (org.apache.kafka.clients.FetchSessionHandler) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending 
> READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 (id: 2 
> rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating 
> connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> 
> (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server 
> API v3 to send OFFSET_COMMIT 
> {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
>  with correlation id 242 to node 2147483645 
> (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with 
> SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 
> (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed 
> connection to node 2. Fetching API versions. 
> (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API 
> versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to 
> topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with 
> /<ip> disconnected (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 
> disconnected. (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request 
> with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, 
> correlationId=241) due to node 2 being disconnected 
> (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch 
> request (sessionId=INVALID, epoch=INITIAL) to node 2: 
> org.apache.kafka.common.errors.DisconnectException. 
> (org.apache.kafka.clients.FetchSessionHandler){code}
> connection to coordinator, rebalance and fetching offsets have finished. when 
> preparing connection to leader for fetching, the connection limit has 
> reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a 
> combination of multiple connections(connections with coordinator and leader 
> in same broker), not atomic, which may leads to "half connected". I think we 
> can do some improvement:
>  # reuse the connection with coordinator and leader in the same broker, to 
> avoid such "half connection" in KafkaConsumer when connection count limit 
> reached
>  # make the connection limit more flexible, such as allowing extra related 
> connections of a consumer when the connection count limit has reached if it 
> has connected to broker



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to