jigar-bhati commented on code in PR #267:
URL:
https://github.com/apache/flink-connector-kafka/pull/267#discussion_r3476830857
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java:
##########
@@ -462,52 +462,57 @@ protected DynamicKafkaSource<RowData>
createDynamicKafkaSource(
.setDeserializer(kafkaDeserializer)
.setProperties(properties);
- switch (startupMode) {
- case EARLIEST:
-
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+
dynamicKafkaSourceBuilder.setStartingOffsets(getStartingOffsetsInitializer());
Review Comment:
Good point. I revised this so the default behavior is preserved:
`auto.offset.reset` is only taken from user properties when explicitly
configured. Otherwise it remains initializer-derived as before.
This still means explicitly configured `auto.offset.reset` is now respected
instead of ignored, but there is no behavior change for jobs that do not set it.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -122,6 +124,31 @@ static OffsetsInitializer
committedOffsets(OffsetResetStrategy offsetResetStrate
KafkaPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
}
+ /**
+ * Get an {@link OffsetsInitializer} which delegates offset initialization
to the given {@link
+ * OffsetsInitializer} and uses the given {@link OffsetResetStrategy} when
Kafka needs to reset
+ * an initialized starting offset.
+ *
+ * <p>The offset reset strategy is only used when the returned initializer
is used to initialize
+ * starting offsets. The initialized offsets themselves are unchanged, so
initializers such as
+ * {@link #earliest()} and {@link #latest()} keep their normal startup
behavior.
+ *
+ * @param offsetsInitializer the initializer which resolves the starting
offsets.
+ * @param offsetResetStrategy the offset reset strategy to use when the
initialized starting
+ * offsets are out of range.
+ * @return an {@link OffsetsInitializer} with the given offset reset
strategy.
+ */
+ static OffsetsInitializer withOffsetResetStrategy(
Review Comment:
Yes, that was the behavior in the earlier version, and I agree it was not
ideal.
I removed `OffsetsInitializer.withOffsetResetStrategy(...)` and the wrapper
initializer. The PR now uses the existing
`ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` path instead, including DataStream
mode.
##########
docs/content/docs/connectors/datastream/kafka.md:
##########
@@ -235,7 +239,7 @@ for more details.
Please note that the following keys will be overridden by the builder even if
it is configured:
-- ```auto.offset.reset.strategy``` is overridden by
```OffsetsInitializer#getAutoOffsetResetStrategy()```
Review Comment:
Updated. The docs no longer mention the removed wrapper API and now say
`auto.offset.reset` is derived from the initializer only when it is not
explicitly configured.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -382,7 +382,7 @@ public KafkaSourceBuilder<OUT>
setRackIdSupplier(SerializableSupplier<String> ra
* created.
*
* <ul>
- * <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+ * <li><code>auto.offset.reset</code> is overridden by {@link
Review Comment:
After the revision, I kept this in the same commit because respecting
explicit `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` in DataStream is now the
core of the fix rather than a side change. Happy to split if maintainers still
prefer.
##########
docs/content.zh/docs/connectors/datastream/kafka.md:
##########
@@ -222,7 +226,7 @@ Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSo
Kafka consumer 的配置可以参考 [Apache Kafka
文档](http://kafka.apache.org/documentation/#consumerconfigs)。
请注意,即使指定了以下配置项,构建器也会将其覆盖:
-- ```auto.offset.reset.strategy``` 被
OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖
Review Comment:
Updated to match the English docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]