[ https://issues.apache.org/jira/browse/FLINK-24851 ]
liwei li deleted comment on FLINK-24851: ---------------------------------- was (Author: liliwei): Also, can I know the background for doing this?Why override this value artificially?Or maybe it's just a bug? > KafkaSourceBuilder: auto.offset.reset is ignored > ------------------------------------------------ > > Key: FLINK-24851 > URL: https://issues.apache.org/jira/browse/FLINK-24851 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Arseniy Tashoyan > Assignee: liwei li > Priority: Major > > Creating KafkaSource like this: > {code:scala} > val props = new Properties() > props.put("bootstrap.servers", "localhost:9092") > props.put("group.id", "group1") > props.put("auto.offset.reset", "latest") > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .build() > {code} > The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of > configured *"latest"*. > This occurs because _"auto.offset.reset"_ gets overridden by > _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. > The default value for _startingOffsetsInitializer_ is _"earliest"_. > This behavior is misleading. > This behavior imposes an inconvenience on configuring the Kafka connector. We > cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must > extract this particular setting from other settings and propagate to > _KafkaSourceBuilder.setStartingOffsets()_: > {code:scala} > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .setStartingOffsets( > OffsetsInitializer.committedOffsets( > OffsetResetStrategy.valueOf( > props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) > .asInstanceOf[String] > .toUpperCase(Locale.ROOT) > ) > ) > ) > .build() > {code} > The expected behavior is to use the value of _"auto.offset.reset"_ provided > by _KafkaSourceBuilder.setProperties()_ - unless overridden via > _KafkaSourceBuilder. setStartingOffsets()_. -- This message was sent by Atlassian Jira (v8.20.1#820001)