arpit09 commented on code in PR #4009:
URL: https://github.com/apache/gobblin/pull/4009#discussion_r1705323654
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -482,20 +487,54 @@ private static void skipWorkUnit(WorkUnit workUnit) {
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY,
workUnit.getLowWaterMark());
}
- private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition,
SourceState state,
+ /**
+ * Get the workunits of all the partitions passed, this method fetches all
the offsets for the partitions
+ * at once from kafka, and for each partiton creates a workunit.
+ * @param partitions
+ * @param state
+ * @param topicSpecificState
+ * @return
+ */
+ private Map<KafkaPartition, WorkUnit>
getWorkUnits(Collection<KafkaPartition> partitions, SourceState state,
Optional<State> topicSpecificState) {
- Offsets offsets = new Offsets();
-
- boolean failedToGetKafkaOffsets = false;
-
+ final Map<KafkaPartition, Offsets> partitionOffsetMap = Maps.newHashMap();
+ final Set<KafkaPartition> fetchOffsetsFailedPartitions = Sets.newHashSet();
try (Timer.Context context =
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
- offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
-
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
-
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
- } catch (Throwable t) {
- failedToGetKafkaOffsets = true;
- LOG.error("Caught error in creating work unit for {}", partition, t);
+ // Fetch the offsets for all the partitions at once
+ final Map<KafkaPartition, Long> earliestOffsetMap =
this.kafkaConsumerClient.get().getEarliestOffsets(partitions);
+ final Map<KafkaPartition, Long> latestOffsetMap =
this.kafkaConsumerClient.get().getLatestOffsets(partitions);
+ for (KafkaPartition partition : partitions) {
+ Offsets offsets = new Offsets();
+ offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
+ if (earliestOffsetMap.containsKey(partition) &&
latestOffsetMap.containsKey(partition)) {
Review Comment:
Added, this is to check if both earliest offset and latest offset available
for the partitions. If there is a discrepancy, we failed to fetch the offsets
and we should move this to failedOffsetPartitionslist.
Added the same in the comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]