[ 
https://issues.apache.org/jira/browse/FLINK-24605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433530#comment-17433530
 ] 

Qingsheng Ren commented on FLINK-24605:
---------------------------------------

Thanks for reporting this [~a_talukdar] ! I think it's a bug in the 
{{KafkaDynamicSource}}. It doesn't check {{auto.offset.reset}} property when 
building {{KafkaSource}} so the offset reset strategy is overwritten as "none". 
I can submit a patch for this.  

> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>  Undefined offset with no reset policy for partitions
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24605
>                 URL: https://issues.apache.org/jira/browse/FLINK-24605
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Abhijit Talukdar
>            Priority: Major
>
> Getting below issue when using 'scan.startup.mode' = 'group-offsets'.
>  
> WITH (
>  'connector' = 'kafka',
>  'topic' = 'ss7gsm-signaling-event',
>  'properties.bootstrap.servers' = '******:9093',
>  'properties.group.id' = 'ss7gsm-signaling-event-T5',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.schema-registry.url' = 'https://***:9099',
>  {color:#ff8b00}'scan.startup.mode' = 'group-offsets',{color}
> {color:#ff8b00} 'properties.auto.offset.reset' = 'earliest',{color}
>  'properties.security.protocol'= 'SASL_SSL',
>  'properties.ssl.truststore.location'= '/*/*/ca-certs.jks',
>  'properties.ssl.truststore.password'= '*****',
>  'properties.sasl.kerberos.service.name'= 'kafka'
> )
>  
> 'ss7gsm-signaling-event-T5' is a new group id. If the group id is present in 
> ZK then it works otherwise getting below exception. 
> 'properties.auto.offset.reset' property is ignored.
>  
> 021-10-20 22:18:28,267 INFO  
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig
>  [] - ConsumerConfig values: 021-10-20 22:18:28,267 INFO  
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig
>  [] - ConsumerConfig values: 
> allow.auto.create.topics = false
> auto.commit.interval.ms = 5000
> {color:#FF0000} +*auto.offset.reset = none*+{color}
> bootstrap.servers = [xxxx.xxx.com:9093]
>  
>  
> Exception:
>  
> 021-10-20 22:18:28,620 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> KafkaSource-hiveSignaling.signaling_stg.ss7gsm_signaling_event_flink_k -> 
> Sink: Collect table sink (1/1) (89b175333242fab8914271ad7638ba92) switched 
> from INITIALIZING to RUNNING.021-10-20 22:18:28,620 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> KafkaSource-hiveSignaling.signaling_stg.ss7gsm_signaling_event_flink_k -> 
> Sink: Collect table sink (1/1) (89b175333242fab8914271ad7638ba92) switched 
> from INITIALIZING to RUNNING.2021-10-20 22:18:28,621 INFO  
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
> Assigning splits to readers \{0=[[Partition: ss7gsm-signaling-event-2, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> ss7gsm-signaling-event-8, StartingOffset: -3, StoppingOffset: 
> -9223372036854775808], [Partition: ss7gsm-signaling-event-7, StartingOffset: 
> -3, StoppingOffset: -9223372036854775808], [Partition: 
> ss7gsm-signaling-event-9, StartingOffset: -3, StoppingOffset: 
> -9223372036854775808], [Partition: ss7gsm-signaling-event-5, StartingOffset: 
> -3, StoppingOffset: -9223372036854775808], [Partition: 
> ss7gsm-signaling-event-6, StartingOffset: -3, StoppingOffset: 
> -9223372036854775808], [Partition: ss7gsm-signaling-event-0, StartingOffset: 
> -3, StoppingOffset: -9223372036854775808], [Partition: 
> ss7gsm-signaling-event-4, StartingOffset: -3, StoppingOffset: 
> -9223372036854775808], [Partition: ss7gsm-signaling-event-1, StartingOffset: 
> -3, StoppingOffset: -9223372036854775808], [Partition: 
> ss7gsm-signaling-event-3, StartingOffset: -3, StoppingOffset: 
> -9223372036854775808]]}2021-10-20 22:18:28,716 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> KafkaSource-hiveSignaling.signaling_stg.ss7gsm_signaling_event_flink_k -> 
> Sink: Collect table sink (1/1) (89b175333242fab8914271ad7638ba92) switched 
> from RUNNING to FAILED on xx.xxx.xxx.xxx:42075-d80607 @ xxxxxx.xxx.com 
> (dataPort=34120).java.lang.RuntimeException: One or more fetchers have 
> encountered exception at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>  ~[flink-table_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>  ~[flink-table_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>  ~[flink-table_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0] at java.lang.Thread.run(Thread.java:748) 
> ~[?:1.8.0_232]Caused by: java.lang.RuntimeException: SplitFetcher thread 0 
> received unexpected exception while polling the records at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>  ~[flink-table_2.11-1.14.0.jar:1.14.0] at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  ~[flink-table_2.11-1.14.0.jar:1.14.0] at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_232] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[?:1.8.0_232] at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_232] at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_232]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to