[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818490#comment-17818490 ]
Martijn Visser commented on FLINK-34470: ---------------------------------------- [~dongwoo.kim] Can you please verify this with the latest version of the Flink Kafka connector? > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.17.1 > Reporter: dongwoo.kim > Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as count(*) > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these controll messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >= stoppingOffset* condition to the if > statement. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)