[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dongwoo.kim updated FLINK-34470: -------------------------------- Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.17.1 > Reporter: dongwoo.kim > Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >= stoppingOffset* condition to the if > statement > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)