[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dongwoo.kim updated FLINK-34470: -------------------------------- Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code split is finished only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution {code:java} if (consumer.position(tp) >= stoppingOffset) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } {code} Replacing if condition to *consumer.position(tp) >= stoppingOffset* in [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. *consumer.position(tp)* gets next record's offset if it exist and the last record's offset if the next record doesn't exist. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.17.1 > Reporter: dongwoo.kim > Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)