[
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15907090#comment-15907090
]
ASF GitHub Bot commented on FLINK-3123:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2687#discussion_r105621324
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -558,11 +629,31 @@ protected static void
initializeSubscribedPartitionsToStartOffsets(
List<KafkaTopicPartition> kafkaTopicPartitions,
int indexOfThisSubtask,
int numParallelSubtasks,
- StartupMode startupMode) {
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
if (i % numParallelSubtasks == indexOfThisSubtask) {
-
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
startupMode.getStateSentinel());
+ if (startupMode !=
StartupMode.SPECIFIC_OFFSETS) {
+
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
startupMode.getStateSentinel());
+ } else {
+ if (specificStartupOffsets == null) {
+ throw new
IllegalArgumentException(
+ "Startup mode for the
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
+ ", but no
specific offsets were specified");
+ }
+
+ KafkaTopicPartition partition =
kafkaTopicPartitions.get(i);
+
+ Long specificOffset =
specificStartupOffsets.get(partition);
--- End diff --
What happens when the offset is negative or any other invalid number?
Is that handled by the individual fetchers?
> Allow setting custom start-offsets for the Kafka consumer
> ---------------------------------------------------------
>
> Key: FLINK-3123
> URL: https://issues.apache.org/jira/browse/FLINK-3123
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.0.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)