AHeise commented on code in PR #192:
URL:
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2409711198
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -131,19 +134,84 @@ public KafkaSourceEnumerator(
this.context = context;
this.boundedness = boundedness;
- this.assignedPartitions = new
HashSet<>(kafkaSourceEnumState.assignedPartitions());
+ Map<AssignmentStatus, List<KafkaPartitionSplit>> splits =
+ initializeMigratedSplits(kafkaSourceEnumState.splits());
+ this.assignedSplits =
indexByPartition(splits.get(AssignmentStatus.ASSIGNED));
+ this.unassignedSplits =
indexByPartition(splits.get(AssignmentStatus.UNASSIGNED));
this.pendingPartitionSplitAssignment = new HashMap<>();
this.partitionDiscoveryIntervalMs =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
Long::parseLong);
this.consumerGroupId =
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
- this.unassignedInitialPartitions =
- new
HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions());
this.initialDiscoveryFinished =
kafkaSourceEnumState.initialDiscoveryFinished();
}
+ /**
+ * Initialize migrated splits to splits with concrete starting offsets.
This method ensures that
+ * the costly offset resolution is performed only when there are splits
that have been
+ * checkpointed with previous enumerator versions.
+ *
+ * <p>Note that this method is deliberately performed in the main thread
to avoid a checkpoint
+ * of the splits without starting offset.
+ */
+ private Map<AssignmentStatus, List<KafkaPartitionSplit>>
initializeMigratedSplits(
+ Set<SplitAndAssignmentStatus> splits) {
+ final Set<TopicPartition> migratedPartitions =
+ splits.stream()
+ .filter(
+ splitStatus ->
+ splitStatus.split().getStartingOffset()
+ ==
KafkaPartitionSplit.MIGRATED)
+ .map(splitStatus ->
splitStatus.split().getTopicPartition())
+ .collect(Collectors.toSet());
+
+ if (migratedPartitions.isEmpty()) {
+ return splitByAssignmentStatus(splits.stream());
+ }
+
+ final Map<TopicPartition, Long> startOffsets =
+ startingOffsetInitializer.getPartitionOffsets(
+ migratedPartitions, getOffsetsRetriever());
+ return splitByAssignmentStatus(
+ splits.stream()
+ .map(splitStatus -> resolveMigratedSplit(splitStatus,
startOffsets)));
Review Comment:
It's unfortunately necessary by design:
* 161 extracts the partitions which are used to jointly look up the
partition offsets
* This is expensive as it uses admin client to contact Kafka cluster
* The design of offset initializer is to jointly look up all partitions to
have 1 request to Kafka brokers only
* Now that we received all offsets, 179 is applying them to the split. It
could be a simple map lookup but I decided to add some assertion, so it went
into a different method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]