Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3505#discussion_r105860279
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
    @@ -194,14 +194,29 @@ protected AbstractFetcher(
     
        /**
         * Restores the partition offsets.
    +    * The partitions in the provided map of restored partitions to offsets 
must completely match
    +    * the fetcher's subscribed partitions.
         * 
    -    * @param snapshotState The offsets for the partitions 
    +    * @param restoredOffsets The restored offsets for the partitions
    +    *
    +    * @throws IllegalStateException if the partitions in the provided 
restored offsets map
    +    * cannot completely match the fetcher's subscribed partitions.
         */
    -   public void restoreOffsets(Map<KafkaTopicPartition, Long> 
snapshotState) {
    -           for (KafkaTopicPartitionState<?> partition : allPartitions) {
    -                   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
    -                   if (offset != null) {
    -                           partition.setOffset(offset);
    +   public void restoreOffsets(Map<KafkaTopicPartition, Long> 
restoredOffsets) {
    +           if (restoredOffsets.size() != allPartitions.length) {
    +                   throw new IllegalStateException(
    +                           "The fetcher was restored with partition 
offsets that do not " +
    +                                   "match with the subscribed partitions: 
" + restoredOffsets);
    --- End diff --
    
    This would not happen with the changes of this PR.
    
    In `open()`, I've set the `subscribedPartitions` to be exactly the same as 
the restored partition states. There is no filtering anymore. The 
`allPartitions` here is basically just the same list, but in their state holder 
form.
    
    The condition checks exists simply because "setting the fetcher's 
subscribed partitions" and "restoring start offsets" is 2 separate calls (the 
former is passed in through the fetcher's constructor, while the latter is 
provided through the `restoreOffsets` method). I added these checks just to 
make the fetcher code more self-contained. These exceptions should actually 
never occur.
    
    I agree this might be a bit confusing for the code reader. In the recent 
refactorings in `master`, the fetcher's subscribed partitions and start offsets 
(regardless of if it's a restore or fresh start) setup procedure is more atomic 
and less confusing in this aspect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to