[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818490#comment-17818490
 ] 

Martijn Visser commented on FLINK-34470:
----------------------------------------

[~dongwoo.kim] Can you please verify this with the latest version of the Flink 
Kafka connector?

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34470
>                 URL: https://issues.apache.org/jira/browse/FLINK-34470
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.1
>            Reporter: dongwoo.kim
>            Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> h2.  How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as count(*)
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these controll messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= stoppingOffset* condition to the if 
> statement. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to