[ https://issues.apache.org/jira/browse/FLINK-24605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433530#comment-17433530 ]
Qingsheng Ren commented on FLINK-24605: --------------------------------------- Thanks for reporting this [~a_talukdar] ! I think it's a bug in the {{KafkaDynamicSource}}. It doesn't check {{auto.offset.reset}} property when building {{KafkaSource}} so the offset reset strategy is overwritten as "none". I can submit a patch for this. > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > Undefined offset with no reset policy for partitions > --------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-24605 > URL: https://issues.apache.org/jira/browse/FLINK-24605 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Abhijit Talukdar > Priority: Major > > Getting below issue when using 'scan.startup.mode' = 'group-offsets'. > > WITH ( > 'connector' = 'kafka', > 'topic' = 'ss7gsm-signaling-event', > 'properties.bootstrap.servers' = '******:9093', > 'properties.group.id' = 'ss7gsm-signaling-event-T5', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.schema-registry.url' = 'https://***:9099', > {color:#ff8b00}'scan.startup.mode' = 'group-offsets',{color} > {color:#ff8b00} 'properties.auto.offset.reset' = 'earliest',{color} > 'properties.security.protocol'= 'SASL_SSL', > 'properties.ssl.truststore.location'= '/*/*/ca-certs.jks', > 'properties.ssl.truststore.password'= '*****', > 'properties.sasl.kerberos.service.name'= 'kafka' > ) > > 'ss7gsm-signaling-event-T5' is a new group id. If the group id is present in > ZK then it works otherwise getting below exception. > 'properties.auto.offset.reset' property is ignored. > > 021-10-20 22:18:28,267 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig > [] - ConsumerConfig values: 021-10-20 22:18:28,267 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig > [] - ConsumerConfig values: > allow.auto.create.topics = false > auto.commit.interval.ms = 5000 > {color:#FF0000} +*auto.offset.reset = none*+{color} > bootstrap.servers = [xxxx.xxx.com:9093] > > > Exception: > > 021-10-20 22:18:28,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaSource-hiveSignaling.signaling_stg.ss7gsm_signaling_event_flink_k -> > Sink: Collect table sink (1/1) (89b175333242fab8914271ad7638ba92) switched > from INITIALIZING to RUNNING.021-10-20 22:18:28,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaSource-hiveSignaling.signaling_stg.ss7gsm_signaling_event_flink_k -> > Sink: Collect table sink (1/1) (89b175333242fab8914271ad7638ba92) switched > from INITIALIZING to RUNNING.2021-10-20 22:18:28,621 INFO > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - > Assigning splits to readers \{0=[[Partition: ss7gsm-signaling-event-2, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > ss7gsm-signaling-event-8, StartingOffset: -3, StoppingOffset: > -9223372036854775808], [Partition: ss7gsm-signaling-event-7, StartingOffset: > -3, StoppingOffset: -9223372036854775808], [Partition: > ss7gsm-signaling-event-9, StartingOffset: -3, StoppingOffset: > -9223372036854775808], [Partition: ss7gsm-signaling-event-5, StartingOffset: > -3, StoppingOffset: -9223372036854775808], [Partition: > ss7gsm-signaling-event-6, StartingOffset: -3, StoppingOffset: > -9223372036854775808], [Partition: ss7gsm-signaling-event-0, StartingOffset: > -3, StoppingOffset: -9223372036854775808], [Partition: > ss7gsm-signaling-event-4, StartingOffset: -3, StoppingOffset: > -9223372036854775808], [Partition: ss7gsm-signaling-event-1, StartingOffset: > -3, StoppingOffset: -9223372036854775808], [Partition: > ss7gsm-signaling-event-3, StartingOffset: -3, StoppingOffset: > -9223372036854775808]]}2021-10-20 22:18:28,716 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaSource-hiveSignaling.signaling_stg.ss7gsm_signaling_event_flink_k -> > Sink: Collect table sink (1/1) (89b175333242fab8914271ad7638ba92) switched > from RUNNING to FAILED on xx.xxx.xxx.xxx:42075-d80607 @ xxxxxx.xxx.com > (dataPort=34120).java.lang.RuntimeException: One or more fetchers have > encountered exception at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) > ~[flink-table_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > ~[flink-table_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) > ~[flink-table_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] at java.lang.Thread.run(Thread.java:748) > ~[?:1.8.0_232]Caused by: java.lang.RuntimeException: SplitFetcher thread 0 > received unexpected exception while polling the records at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) > ~[flink-table_2.11-1.14.0.jar:1.14.0] at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) > ~[flink-table_2.11-1.14.0.jar:1.14.0] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[?:1.8.0_232] at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[?:1.8.0_232] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_232] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_232] > -- This message was sent by Atlassian Jira (v8.3.4#803005)