Arvid Heise created FLINK-36434:
-----------------------------------
Summary: Revise threading model of (KafkaPartition)SplitReader
Key: FLINK-36434
URL: https://issues.apache.org/jira/browse/FLINK-36434
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: kafka-3.2.0
Reporter: Arvid Heise
The KafkaPartitionSplitReader is created in the source thread, where it
initializes the consumer. However, it later access the consumer almost
exclusively through the fetcher thread. Since the consumer is not thread-safe,
this thread model looks broken.
However, I'd challenge that the overall SplitReader implementation is already
suboptimal as the same issue is probably happening in other connectors. I'd
probably first create the fetch task and within the fetch task create the split
reader.
If left as-is, we can't upgrade Kafka client anymore because we receive sporadic
{code:java}
Caused by: org.apache.kafka.common.requests.CorrelationIdMismatchException:
Correlation id for response (1179651) does not match request (0), request
header: RequestHeader(apiKey=API_VERSIONS, apiVersion=3,
clientId=kafka-source-external-context-6092797646400842179-3, correlationId=0,
headerVersion=2)
at
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:106)
at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:740)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:913)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:580)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1728)
at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1686)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.lambda$removeEmptySplits$5(KafkaPartitionSplitReader.java:375)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.retryOnWakeup(KafkaPartitionSplitReader.java:481)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:374)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:224)
at
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)