[jira] [Commented] (KAFKA-14189) Improve connection limit and reuse of coordinator and leader in KafkaConsumer

2022-08-30 Thread Von Gosling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597851#comment-17597851
 ] 

Von Gosling commented on KAFKA-14189:
-

I'd like to hear some suggestions from [~junrao]. Do we have the possibility to 
reuse the same connection in such conditions?

> Improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -
>
> Key: KAFKA-14189
> URL: https://issues.apache.org/jira/browse/KAFKA-14189
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.9.0.0
>Reporter: Junyang Liu
>Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is 
> Integer.MAX_VALUE - coordinator id, which is different with connection id of 
> partition leader. So the connection cannot be reused when coordinator and 
> leader are in the same broker, which means we need two seperated connections 
> with the same broker. Suppose such case, a consumer has connected to the 
> coordinator and finished Join and Sync, and wants to send FETCH to leader in 
> the same broker. But the connection count has reached limit, so the consumer 
> with be in the group but cannot consume messages
> partial logs:
> {code:java}
> Added READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 
> to node :9092 (id: 2 rack: 2) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 
> partition(s). (org.apache.kafka.clients.FetchSessionHandler)
> Sending READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker :9092 
> (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> Initiating connection to node :9092 (id: 2 rack: 2) using address / 
> (org.apache.kafka.clients.NetworkClient)
> Using older server API v3 to send OFFSET_COMMIT 
> {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
>  with correlation id 242 to node 2147483645 
> (org.apache.kafka.clients.NetworkClient)
> Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to 
> node 2 (org.apache.kafka.common.network.Selector)
> Completed connection to node 2. Fetching API versions. 
> (org.apache.kafka.clients.NetworkClient)
> Initiating API versions fetch from node 2. 
> (org.apache.kafka.clients.NetworkClient)
> Subscribed to topic(s): topic-test 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> Connection with / disconnected (org.apache.kafka.common.network.Selector)
> Node 2 disconnected. (org.apache.kafka.clients.NetworkClient) {code}
> connection to coordinator, rebalance and fetching offsets have finished. when 
> preparing connection to leader for fetching, the connection limit has 
> reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a 
> combination of multiple connections(connections with coordinator and leader 
> in same broker), not atomic, which may leads to "half connected". I think we 
> can do some improvement:
>  # reuse the connection with coordinator and leader in the same broker
>  # make the connection limit more flexible, such as allowing extra related 
> connections of a consumer when the connection count limit has reached if it 
> has connected to broker



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9296) Correlation id for response () does not match request ()

2022-04-06 Thread Von Gosling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518617#comment-17518617
 ] 

Von Gosling commented on KAFKA-9296:


[~junrao] Any update for this, I have received some reports from our products.

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)