[jira] (FLINK-24851) KafkaSourceBuilder: auto.offset.reset is ignored

2021-11-10 Thread liwei li (Jira)


[ https://issues.apache.org/jira/browse/FLINK-24851 ]


liwei li deleted comment on FLINK-24851:
--

was (Author: liliwei):
Also, what was the background of the artificial overwriting of this value?Or 
maybe it's just a bug?

> KafkaSourceBuilder: auto.offset.reset is ignored
> 
>
> Key: FLINK-24851
> URL: https://issues.apache.org/jira/browse/FLINK-24851
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0
>Reporter: Arseniy Tashoyan
>Assignee: liwei li
>Priority: Major
>
> Creating KafkaSource like this:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> props.put("auto.offset.reset", "latest")
> val kafkaSource = KafkaSource.builder[String]()
>   .setProperties(props)
>   .build()
> {code}
> The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of 
> configured *"latest"*.
> This occurs because _"auto.offset.reset"_ gets overridden by 
> _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
>  The default value for _startingOffsetsInitializer_ is _"earliest"_.
> This behavior is misleading.
> This behavior imposes an inconvenience on configuring the Kafka connector. We 
> cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must 
> extract this particular setting from other settings and propagate to 
> _KafkaSourceBuilder.setStartingOffsets()_:
> {code:scala}
> val kafkaSource = KafkaSource.builder[String]()
>   .setProperties(props)
>   .setStartingOffsets(
> OffsetsInitializer.committedOffsets(
>   OffsetResetStrategy.valueOf(
> props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>   .asInstanceOf[String]
>   .toUpperCase(Locale.ROOT)
>   )
> )
>   )
>   .build()
> {code}
> The expected behavior is to use the value of _"auto.offset.reset"_ provided 
> by _KafkaSourceBuilder.setProperties()_ - unless overridden via 
> _KafkaSourceBuilder. setStartingOffsets()_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] (FLINK-24851) KafkaSourceBuilder: auto.offset.reset is ignored

2021-11-10 Thread liwei li (Jira)


[ https://issues.apache.org/jira/browse/FLINK-24851 ]


liwei li deleted comment on FLINK-24851:
--

was (Author: liliwei):
Also, can I know the background for doing this?Why override this value 
artificially?Or maybe it's just a bug?

> KafkaSourceBuilder: auto.offset.reset is ignored
> 
>
> Key: FLINK-24851
> URL: https://issues.apache.org/jira/browse/FLINK-24851
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0
>Reporter: Arseniy Tashoyan
>Assignee: liwei li
>Priority: Major
>
> Creating KafkaSource like this:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> props.put("auto.offset.reset", "latest")
> val kafkaSource = KafkaSource.builder[String]()
>   .setProperties(props)
>   .build()
> {code}
> The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of 
> configured *"latest"*.
> This occurs because _"auto.offset.reset"_ gets overridden by 
> _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
>  The default value for _startingOffsetsInitializer_ is _"earliest"_.
> This behavior is misleading.
> This behavior imposes an inconvenience on configuring the Kafka connector. We 
> cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must 
> extract this particular setting from other settings and propagate to 
> _KafkaSourceBuilder.setStartingOffsets()_:
> {code:scala}
> val kafkaSource = KafkaSource.builder[String]()
>   .setProperties(props)
>   .setStartingOffsets(
> OffsetsInitializer.committedOffsets(
>   OffsetResetStrategy.valueOf(
> props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>   .asInstanceOf[String]
>   .toUpperCase(Locale.ROOT)
>   )
> )
>   )
>   .build()
> {code}
> The expected behavior is to use the value of _"auto.offset.reset"_ provided 
> by _KafkaSourceBuilder.setProperties()_ - unless overridden via 
> _KafkaSourceBuilder. setStartingOffsets()_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)