Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3505#discussion_r105860279
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
---
@@ -194,14 +194,29 @@ protected AbstractFetcher(
/**
* Restores the partition offsets.
+ * The partitions in the provided map of restored partitions to offsets
must completely match
+ * the fetcher's subscribed partitions.
*
- * @param snapshotState The offsets for the partitions
+ * @param restoredOffsets The restored offsets for the partitions
+ *
+ * @throws IllegalStateException if the partitions in the provided
restored offsets map
+ * cannot completely match the fetcher's subscribed partitions.
*/
- public void restoreOffsets(Map<KafkaTopicPartition, Long>
snapshotState) {
- for (KafkaTopicPartitionState<?> partition : allPartitions) {
- Long offset =
snapshotState.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- partition.setOffset(offset);
+ public void restoreOffsets(Map<KafkaTopicPartition, Long>
restoredOffsets) {
+ if (restoredOffsets.size() != allPartitions.length) {
+ throw new IllegalStateException(
+ "The fetcher was restored with partition
offsets that do not " +
+ "match with the subscribed partitions:
" + restoredOffsets);
--- End diff --
This would not happen with the changes of this PR.
In `open()`, I've set the `subscribedPartitions` to be exactly the same as
the restored partition states. There is no filtering anymore. The
`allPartitions` here is basically just the same list, but in their state holder
form.
The condition checks exists simply because "setting the fetcher's
subscribed partitions" and "restoring start offsets" is 2 separate calls (the
former is passed in through the fetcher's constructor, while the latter is
provided through the `restoreOffsets` method). I added these checks just to
make the fetcher code more self-contained. These exceptions should actually
never occur.
I agree this might be a bit confusing for the code reader. In the recent
refactorings in `master`, the fetcher's subscribed partitions and start offsets
(regardless of if it's a restore or fresh start) setup procedure is more atomic
and less confusing in this aspect.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---