Hi Paimon Community:
Now that, If the user does not set scan.startup.mode or
properties.auto.offset.reset, then the default is group-offsets, which causes
the following exception.
2025-02-12 16:52:37,248 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: Kafka Source -> Parse -> Side Output -> (Schema
Evolution, Case-insensitive Convert -> Writer : orders_table -> Compact
Coordinator: orders_table) (1/1)#0
(a8f049d07ffb31c14b67745b764bf8ef_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist-1.19.1.jar:1.19.1]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[flink-dist-1.19.1.jar:1.19.1]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_432]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_432]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_432]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_432]
... 1 more
Caused by:
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
Undefined offset with no reset policy for partitions: [test_kafka_offset-0]
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:712)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2461)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1758)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.lambda$removeEmptySplits$4(KafkaPartitionSplitReader.java:375)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.retryOnWakeup(KafkaPartitionSplitReader.java:481)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:374)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:224)
~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
at
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.19.1.jar:1.19.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_432]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_432]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_432]
... 1 more
But we expect the following:
Situation 1
If the user does not set --kafka_conf scan.startup.mode, then earliest-offset
should be used by default.
Situation 2
If the user does not set --kafka_conf properties.auto.offset.reset, then
earliest(This is kafka default value) should be used by default.
Best,
Jeff Yang