dongwoo.kim created FLINK-34470:
-----------------------------------

             Summary: Transactional message + Table api kafka source with 
'latest-offset' scan bound mode causes indefinitely hanging
                 Key: FLINK-34470
                 URL: https://issues.apache.org/jira/browse/FLINK-34470
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.17.1
            Reporter: dongwoo.kim


h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.


h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as count(*)
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these controll messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to