Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3505#discussion_r105851319
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
    @@ -194,14 +194,29 @@ protected AbstractFetcher(
     
        /**
         * Restores the partition offsets.
    +    * The partitions in the provided map of restored partitions to offsets 
must completely match
    +    * the fetcher's subscribed partitions.
         * 
    -    * @param snapshotState The offsets for the partitions 
    +    * @param restoredOffsets The restored offsets for the partitions
    +    *
    +    * @throws IllegalStateException if the partitions in the provided 
restored offsets map
    +    * cannot completely match the fetcher's subscribed partitions.
         */
    -   public void restoreOffsets(Map<KafkaTopicPartition, Long> 
snapshotState) {
    -           for (KafkaTopicPartitionState<?> partition : allPartitions) {
    -                   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
    -                   if (offset != null) {
    -                           partition.setOffset(offset);
    +   public void restoreOffsets(Map<KafkaTopicPartition, Long> 
restoredOffsets) {
    +           if (restoredOffsets.size() != allPartitions.length) {
    +                   throw new IllegalStateException(
    +                           "The fetcher was restored with partition 
offsets that do not " +
    +                                   "match with the subscribed partitions: 
" + restoredOffsets);
    --- End diff --
    
    I'm not sure if this check is too strict. I think its easily possible in 
Kafka to add new partitions to a topic.
    If a user does this, they can never restore their Kafka consumers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to