dongwoo.kim created FLINK-34470: ----------------------------------- Summary: Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging Key: FLINK-34470 URL: https://issues.apache.org/jira/browse/FLINK-34470 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Reporter: dongwoo.kim
h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as count(*) 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these controll messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)