Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/3505#discussion_r105869364
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
---
@@ -194,14 +194,29 @@ protected AbstractFetcher(
/**
* Restores the partition offsets.
+ * The partitions in the provided map of restored partitions to offsets
must completely match
+ * the fetcher's subscribed partitions.
*
- * @param snapshotState The offsets for the partitions
+ * @param restoredOffsets The restored offsets for the partitions
+ *
+ * @throws IllegalStateException if the partitions in the provided
restored offsets map
+ * cannot completely match the fetcher's subscribed partitions.
*/
- public void restoreOffsets(Map<KafkaTopicPartition, Long>
snapshotState) {
- for (KafkaTopicPartitionState<?> partition : allPartitions) {
- Long offset =
snapshotState.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- partition.setOffset(offset);
+ public void restoreOffsets(Map<KafkaTopicPartition, Long>
restoredOffsets) {
+ if (restoredOffsets.size() != allPartitions.length) {
+ throw new IllegalStateException(
+ "The fetcher was restored with partition
offsets that do not " +
+ "match with the subscribed partitions:
" + restoredOffsets);
--- End diff --
I see. Thank you.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---