[ 
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860998#comment-17860998
 ] 

dongwoo.kim commented on FLINK-35565:
-------------------------------------

Since the stoppingOffset logic relies on the last record's offset, it can fall 
into an indefinite loop if all records are deleted. 
This [pr|https://github.com/apache/flink-connector-kafka/pull/100] resolves the 
issue. Please take a look.

> Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
> --------------------------------------------------------------------------
>
>                 Key: FLINK-35565
>                 URL: https://issues.apache.org/jira/browse/FLINK-35565
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.1.0
>         Environment: This is reproduced on a *Flink 1.18.1* with the latest 
> Kafka connector 3.1.0-1.18 on a session cluster.
>            Reporter: Naci Simsek
>            Priority: Major
>         Attachments: image-2024-06-11-11-19-09-889.png, 
> taskmanager_localhost_54489-ac092a_log.txt
>
>
> h2. Summary
> Flink batch job gets into an infinite fetch loop and could not gracefully 
> finish if the connected Kafka topic is empty and starting offset value in 
> Flink job is lower than the current start/end offset of the related topic. 
> See below for details:
> h2. How to reproduce
> Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
> from Kafka topic.
> Related Kafka topic is empty, there are no events, and the offset value is as 
> below: *15*
> !image-2024-06-11-11-19-09-889.png|width=895,height=256!
>  
> Flink job uses a *specific starting offset* value, which is +*less*+ than the 
> current offset of the topic/partition.
> See below, it set as “4”
>  
> {code:java}
> package naci.grpId;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.connector.kafka.source.KafkaSource;
> import 
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.kafka.common.TopicPartition;
> import java.util.HashMap;
> import java.util.Map;
> public class KafkaSource_Print {
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         // Define the specific offsets for the partitions
>         Map<TopicPartition, Long> specificOffsets = new HashMap<>();
>         specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // 
> Start from offset 4 for partition 0
>         KafkaSource<String> kafkaSource = KafkaSource
>                 .<String>builder()
>                 .setBootstrapServers("localhost:9093")  // Make sure the port 
> is correct
>                 .setTopics("topic_test")
>                 .setValueOnlyDeserializer(new SimpleStringSchema())
>                 
> .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
>                 .setBounded(OffsetsInitializer.latest())
>                 .build();
>         DataStream<String> stream = env.fromSource(
>                 kafkaSource,
>                 WatermarkStrategy.noWatermarks(),
>                 "Kafka Source"
>         );
>         stream.print();
>         env.execute("Flink KafkaSource test job");
>     }
> }{code}
>  
>  
> Here are the initial logs printed related to the offset, as soon as the job 
> gets submitted:
>  
> {code:java}
> 2024-05-30 12:15:50,010 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]
> 2024-05-30 12:15:50,069 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]]
> 2024-05-30 12:15:50,074 TRACE 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> Seeking starting offsets to specified offsets: {topic_test-0=4}
> 2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
>              [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
> groupId=null] Seeking to offset 4 for partition topic_test-0
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> SplitsChange handling result: [topic_test-0, start:4, stop: 15]
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
> StartingOffset: 4, StoppingOffset: 15]]]
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask{code}
>  
> Since the starting offset {color:#ff0000}*4*{color} is *out of range* for the 
> Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
> manager logs:
>  
> {code:java}
> 2024-05-30 12:15:50,193 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher          [] - [Consumer 
> clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
> FetchPosition{offset=4, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
> 2024-05-30 12:15:50,195 INFO  
> org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
> clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset 
> for partition topic_test-0 to position FetchPosition{offset=15, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
> null)], epoch=0}}.{code}
>  
>  
> Then, an {color:#ff0000}*infinite {{FetchTask}} loop*{color} starts:
>  
> {code:java}
> 2024-05-30 12:16:00,079 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:00,079 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:00,079 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:00,080 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:06,288 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:08,755 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:10,082 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:10,082 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:10,082 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:10,082 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:10,082 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:16,290 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:17,393 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> file upload request for file LOG
> 2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient        
>              [] - PUT BLOB stream to /127.0.0.1:55663.
> 2024-05-30 12:16:18,757 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:20,084 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:20,084 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:20,084 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:20,084 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:20,084 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:26,293 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:28,761 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:30,086 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:30,086 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:30,086 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:30,086 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:30,086 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:36,296 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:38,762 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:40,087 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:40,087 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:40,087 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:40,088 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:40,088 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:46,297 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:48,765 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:50,089 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask{code}
>  
> The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which 
> will be placed in offset 15.
> {+}*Expected Result*{+}: Since this is a batch job, and since there is no 
> event on the Kafka topic, right after offset reset, Flink connector should 
> identify that there is no events to process, and gracefully finish the 
> application.
> {+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event 
> from offset:15 which actually exists but no events on that offset, 
> application keep fetching that same offset!
>  
> This issue is +*NOT*+ happening if the above Flink application sets a 
> *starting offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher, 
> no offset reset is performed, and the Flink application gracefully finishes!
> This is reproduced on a *Flink 1.18.1* with the latest Kafka connector 
> 3.1.0-1.18 on a session cluster.
> Logs are attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to