[jira] (FLINK-24851) KafkaSourceBuilder: auto.offset.reset is ignored
[ https://issues.apache.org/jira/browse/FLINK-24851 ] liwei li deleted comment on FLINK-24851: -- was (Author: liliwei): Also, what was the background of the artificial overwriting of this value?Or maybe it's just a bug? > KafkaSourceBuilder: auto.offset.reset is ignored > > > Key: FLINK-24851 > URL: https://issues.apache.org/jira/browse/FLINK-24851 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 >Reporter: Arseniy Tashoyan >Assignee: liwei li >Priority: Major > > Creating KafkaSource like this: > {code:scala} > val props = new Properties() > props.put("bootstrap.servers", "localhost:9092") > props.put("group.id", "group1") > props.put("auto.offset.reset", "latest") > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .build() > {code} > The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of > configured *"latest"*. > This occurs because _"auto.offset.reset"_ gets overridden by > _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. > The default value for _startingOffsetsInitializer_ is _"earliest"_. > This behavior is misleading. > This behavior imposes an inconvenience on configuring the Kafka connector. We > cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must > extract this particular setting from other settings and propagate to > _KafkaSourceBuilder.setStartingOffsets()_: > {code:scala} > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .setStartingOffsets( > OffsetsInitializer.committedOffsets( > OffsetResetStrategy.valueOf( > props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) > .asInstanceOf[String] > .toUpperCase(Locale.ROOT) > ) > ) > ) > .build() > {code} > The expected behavior is to use the value of _"auto.offset.reset"_ provided > by _KafkaSourceBuilder.setProperties()_ - unless overridden via > _KafkaSourceBuilder. setStartingOffsets()_. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] (FLINK-24851) KafkaSourceBuilder: auto.offset.reset is ignored
[ https://issues.apache.org/jira/browse/FLINK-24851 ] liwei li deleted comment on FLINK-24851: -- was (Author: liliwei): Also, can I know the background for doing this?Why override this value artificially?Or maybe it's just a bug? > KafkaSourceBuilder: auto.offset.reset is ignored > > > Key: FLINK-24851 > URL: https://issues.apache.org/jira/browse/FLINK-24851 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 >Reporter: Arseniy Tashoyan >Assignee: liwei li >Priority: Major > > Creating KafkaSource like this: > {code:scala} > val props = new Properties() > props.put("bootstrap.servers", "localhost:9092") > props.put("group.id", "group1") > props.put("auto.offset.reset", "latest") > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .build() > {code} > The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of > configured *"latest"*. > This occurs because _"auto.offset.reset"_ gets overridden by > _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. > The default value for _startingOffsetsInitializer_ is _"earliest"_. > This behavior is misleading. > This behavior imposes an inconvenience on configuring the Kafka connector. We > cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must > extract this particular setting from other settings and propagate to > _KafkaSourceBuilder.setStartingOffsets()_: > {code:scala} > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .setStartingOffsets( > OffsetsInitializer.committedOffsets( > OffsetResetStrategy.valueOf( > props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) > .asInstanceOf[String] > .toUpperCase(Locale.ROOT) > ) > ) > ) > .build() > {code} > The expected behavior is to use the value of _"auto.offset.reset"_ provided > by _KafkaSourceBuilder.setProperties()_ - unless overridden via > _KafkaSourceBuilder. setStartingOffsets()_. -- This message was sent by Atlassian Jira (v8.20.1#820001)