[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:
--------------------------------
    Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34470
>                 URL: https://issues.apache.org/jira/browse/FLINK-34470
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.1
>            Reporter: dongwoo.kim
>            Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= stoppingOffset* condition to the if 
> statement 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to