[ https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860130#comment-17860130 ]
Muhammet Orazov commented on FLINK-35565: ----------------------------------------- The solution of https://issues.apache.org/jira/browse/FLINK-34470 could work for this issue also. > Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset > -------------------------------------------------------------------------- > > Key: FLINK-35565 > URL: https://issues.apache.org/jira/browse/FLINK-35565 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: kafka-3.1.0 > Environment: This is reproduced on a *Flink 1.18.1* with the latest > Kafka connector 3.1.0-1.18 on a session cluster. > Reporter: Naci Simsek > Priority: Major > Attachments: image-2024-06-11-11-19-09-889.png, > taskmanager_localhost_54489-ac092a_log.txt > > > h2. Summary > Flink batch job gets into an infinite fetch loop and could not gracefully > finish if the connected Kafka topic is empty and starting offset value in > Flink job is lower than the current start/end offset of the related topic. > See below for details: > h2. How to reproduce > Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events > from Kafka topic. > Related Kafka topic is empty, there are no events, and the offset value is as > below: *15* > !image-2024-06-11-11-19-09-889.png|width=895,height=256! > > Flink job uses a *specific starting offset* value, which is +*less*+ than the > current offset of the topic/partition. > See below, it set as “4” > > {code:java} > package naci.grpId; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.connector.kafka.source.KafkaSource; > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.kafka.common.TopicPartition; > import java.util.HashMap; > import java.util.Map; > public class KafkaSource_Print { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // Define the specific offsets for the partitions > Map<TopicPartition, Long> specificOffsets = new HashMap<>(); > specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // > Start from offset 4 for partition 0 > KafkaSource<String> kafkaSource = KafkaSource > .<String>builder() > .setBootstrapServers("localhost:9093") // Make sure the port > is correct > .setTopics("topic_test") > .setValueOnlyDeserializer(new SimpleStringSchema()) > > .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)) > .setBounded(OffsetsInitializer.latest()) > .build(); > DataStream<String> stream = env.fromSource( > kafkaSource, > WatermarkStrategy.noWatermarks(), > "Kafka Source" > ); > stream.print(); > env.execute("Flink KafkaSource test job"); > } > }{code} > > > Here are the initial logs printed related to the offset, as soon as the job > gets submitted: > > {code:java} > 2024-05-30 12:15:50,010 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding > split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, > StoppingOffset: 15]] > 2024-05-30 12:15:50,069 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, > StoppingOffset: 15]]] > 2024-05-30 12:15:50,074 TRACE > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - > Seeking starting offsets to specified offsets: {topic_test-0=4} > 2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer > [] - [Consumer clientId=KafkaSource--2381765882724812354-0, > groupId=null] Seeking to offset 4 for partition topic_test-0 > 2024-05-30 12:15:50,075 DEBUG > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - > SplitsChange handling result: [topic_test-0, start:4, stop: 15] > 2024-05-30 12:15:50,075 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task AddSplitsTask: [[[Partition: topic_test-0, > StartingOffset: 4, StoppingOffset: 15]]] > 2024-05-30 12:15:50,075 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run FetchTask{code} > > Since the starting offset {color:#ff0000}*4*{color} is *out of range* for the > Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task > manager logs: > > {code:java} > 2024-05-30 12:15:50,193 INFO > org.apache.kafka.clients.consumer.internals.Fetcher [] - [Consumer > clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position > FetchPosition{offset=4, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: > null)], epoch=0}} is out of range for partition topic_test-0, resetting offset > 2024-05-30 12:15:50,195 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer > clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset > for partition topic_test-0 to position FetchPosition{offset=15, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: > null)], epoch=0}}.{code} > > > Then, an {color:#ff0000}*infinite {{FetchTask}} loop*{color} starts: > > {code:java} > 2024-05-30 12:16:00,079 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task FetchTask > 2024-05-30 12:16:00,079 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run FetchTask > 2024-05-30 12:16:00,079 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current > fetch is finished. > 2024-05-30 12:16:00,080 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source > reader status: NOTHING_AVAILABLE > 2024-05-30 12:16:06,288 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from df54e7abdfa0095dc5c214b056153dea. > 2024-05-30 12:16:08,755 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from e1746de110bfdd23c7dba50f3b083621. > 2024-05-30 12:16:10,082 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task FetchTask > 2024-05-30 12:16:10,082 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run FetchTask > 2024-05-30 12:16:10,082 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting > next source data batch from queue > 2024-05-30 12:16:10,082 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current > fetch is finished. > 2024-05-30 12:16:10,082 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source > reader status: NOTHING_AVAILABLE > 2024-05-30 12:16:16,290 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from df54e7abdfa0095dc5c214b056153dea. > 2024-05-30 12:16:17,393 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > file upload request for file LOG > 2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient > [] - PUT BLOB stream to /127.0.0.1:55663. > 2024-05-30 12:16:18,757 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from e1746de110bfdd23c7dba50f3b083621. > 2024-05-30 12:16:20,084 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task FetchTask > 2024-05-30 12:16:20,084 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting > next source data batch from queue > 2024-05-30 12:16:20,084 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run FetchTask > 2024-05-30 12:16:20,084 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current > fetch is finished. > 2024-05-30 12:16:20,084 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source > reader status: NOTHING_AVAILABLE > 2024-05-30 12:16:26,293 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from df54e7abdfa0095dc5c214b056153dea. > 2024-05-30 12:16:28,761 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from e1746de110bfdd23c7dba50f3b083621. > 2024-05-30 12:16:30,086 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task FetchTask > 2024-05-30 12:16:30,086 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting > next source data batch from queue > 2024-05-30 12:16:30,086 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run FetchTask > 2024-05-30 12:16:30,086 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current > fetch is finished. > 2024-05-30 12:16:30,086 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source > reader status: NOTHING_AVAILABLE > 2024-05-30 12:16:36,296 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from df54e7abdfa0095dc5c214b056153dea. > 2024-05-30 12:16:38,762 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from e1746de110bfdd23c7dba50f3b083621. > 2024-05-30 12:16:40,087 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task FetchTask > 2024-05-30 12:16:40,087 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting > next source data batch from queue > 2024-05-30 12:16:40,087 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run FetchTask > 2024-05-30 12:16:40,088 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current > fetch is finished. > 2024-05-30 12:16:40,088 TRACE > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source > reader status: NOTHING_AVAILABLE > 2024-05-30 12:16:46,297 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from df54e7abdfa0095dc5c214b056153dea. > 2024-05-30 12:16:48,765 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received > heartbeat request from e1746de110bfdd23c7dba50f3b083621. > 2024-05-30 12:16:50,089 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task FetchTask{code} > > The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which > will be placed in offset 15. > {+}*Expected Result*{+}: Since this is a batch job, and since there is no > event on the Kafka topic, right after offset reset, Flink connector should > identify that there is no events to process, and gracefully finish the > application. > {+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event > from offset:15 which actually exists but no events on that offset, > application keep fetching that same offset! > > This issue is +*NOT*+ happening if the above Flink application sets a > *starting offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher, > no offset reset is performed, and the Flink application gracefully finishes! > This is reproduced on a *Flink 1.18.1* with the latest Kafka connector > 3.1.0-1.18 on a session cluster. > Logs are attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)