aditya1105 commented on code in PR #4009:
URL: https://github.com/apache/gobblin/pull/4009#discussion_r1704886165
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -440,29 +440,34 @@ private int calculateNumMappersForPacker(SourceState
state,
/*
* This function need to be thread safe since it is called in the Runnable
*/
- private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState
state,
- Optional<State> topicSpecificState, Optional<Set<Integer>>
filteredPartitions) {
+ public List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState
state, Optional<State> topicSpecificState,
+ Optional<Set<Integer>> filteredPartitions) {
Timer.Context context =
this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();
- List<WorkUnit> workUnits = Lists.newArrayList();
- List<KafkaPartition> topicPartitions = topic.getPartitions();
- for (KafkaPartition partition : topicPartitions) {
- if(filteredPartitions.isPresent() &&
!filteredPartitions.get().contains(partition.getId())) {
- continue;
- }
- WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state,
topicSpecificState);
- if (workUnit != null) {
- // For disqualified topics, for each of its workunits set the high
watermark to be the same
- // as the low watermark, so that it will be skipped.
- if (!topicQualified) {
- skipWorkUnit(workUnit);
- }
- workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
- workUnits.add(workUnit);
+ final List<WorkUnit> workUnits = Lists.newArrayList();
+ final List<KafkaPartition> topicPartitions = topic.getPartitions();
+ Map<KafkaPartition, WorkUnit> workUnitMap;
+
+ if (filteredPartitions.isPresent()) {
+ LOG.info("Filtered partitions for topic {} are {}", topic.getName(),
filteredPartitions.get());
+ final List<KafkaPartition> filteredPartitionsToBeProcessed =
topicPartitions.stream()
+ .filter(partition ->
filteredPartitions.get().contains(partition.getId()))
+ .collect(Collectors.toList());
+ workUnitMap = getWorkUnits(filteredPartitionsToBeProcessed, state,
topicSpecificState);
+ } else {
+ workUnitMap = getWorkUnits(topicPartitions, state, topicSpecificState);
+ }
+
+ for (WorkUnit workUnit : workUnitMap.values()) {
Review Comment:
Here is a more polite version of your statement:
In this method, the `workUnitMap` is initialized and contains work units for
partitions of a single topic. If the topic is qualified, all work units should
be skipped; otherwise, none should be skipped. If this understanding is
correct, could we consider moving `if (!topicQualified)` outside?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java:
##########
@@ -66,6 +66,25 @@ public interface GobblinKafkaConsumerClient extends
Closeable {
*/
public long getEarliestOffset(KafkaPartition partition) throws
KafkaOffsetRetrievalFailureException;
+ /**
+ * Get the earliest available offset for a {@link Collection} of {@link
KafkaPartition}s. NOTE: The default implementation
+ * is not efficient i.e. it will make a getEarliest() call for every {@link
KafkaPartition}. Individual implementations
+ * of {@link GobblinKafkaConsumerClient} should override this method to use
more advanced APIs of the underlying KafkaConsumer
+ * to retrieve the latest offsets for a collection of partitions.
+ *
+ * @param partitions for which earliest offset is retrieved
+ *
+ * @throws KafkaOffsetRetrievalFailureException - If the underlying
kafka-client does not support getting the earliest offset
+ */
+ public default Map<KafkaPartition, Long>
getEarliestOffsets(Collection<KafkaPartition> partitions)
Review Comment:
Nit: make variable immutable, across all the changes in this PR. Example
final Collection<KafkaPartition> partitions
final Map<KafkaPartition, Long> offsetMap
final KafkaPartition partition : partitions
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -482,20 +487,54 @@ private static void skipWorkUnit(WorkUnit workUnit) {
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY,
workUnit.getLowWaterMark());
}
- private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition,
SourceState state,
+ /**
+ * Get the workunits of all the partitions passed, this method fetches all
the offsets for the partitions
+ * at once from kafka, and for each partiton creates a workunit.
+ * @param partitions
+ * @param state
+ * @param topicSpecificState
+ * @return
+ */
+ private Map<KafkaPartition, WorkUnit>
getWorkUnits(Collection<KafkaPartition> partitions, SourceState state,
Optional<State> topicSpecificState) {
- Offsets offsets = new Offsets();
-
- boolean failedToGetKafkaOffsets = false;
-
+ final Map<KafkaPartition, Offsets> partitionOffsetMap = Maps.newHashMap();
+ final Set<KafkaPartition> fetchOffsetsFailedPartitions = Sets.newHashSet();
try (Timer.Context context =
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
Review Comment:
nit: final
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -157,9 +157,10 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
// sharing the kafka consumer may result in contention, so support thread
local consumers
- protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient>
kafkaConsumerClientPool = new ConcurrentLinkedQueue();
+ protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient>
kafkaConsumerClientPool =
Review Comment:
There seem to be many whitespace changes in this PR. Could we please remove
the whitespace changes and retain only the newly added or modified changes?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -482,20 +487,54 @@ private static void skipWorkUnit(WorkUnit workUnit) {
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY,
workUnit.getLowWaterMark());
}
- private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition,
SourceState state,
+ /**
+ * Get the workunits of all the partitions passed, this method fetches all
the offsets for the partitions
+ * at once from kafka, and for each partiton creates a workunit.
+ * @param partitions
+ * @param state
+ * @param topicSpecificState
+ * @return
+ */
+ private Map<KafkaPartition, WorkUnit>
getWorkUnits(Collection<KafkaPartition> partitions, SourceState state,
Optional<State> topicSpecificState) {
- Offsets offsets = new Offsets();
-
- boolean failedToGetKafkaOffsets = false;
-
+ final Map<KafkaPartition, Offsets> partitionOffsetMap = Maps.newHashMap();
+ final Set<KafkaPartition> fetchOffsetsFailedPartitions = Sets.newHashSet();
try (Timer.Context context =
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
- offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
-
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
-
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
- } catch (Throwable t) {
- failedToGetKafkaOffsets = true;
- LOG.error("Caught error in creating work unit for {}", partition, t);
+ // Fetch the offsets for all the partitions at once
+ final Map<KafkaPartition, Long> earliestOffsetMap =
this.kafkaConsumerClient.get().getEarliestOffsets(partitions);
+ final Map<KafkaPartition, Long> latestOffsetMap =
this.kafkaConsumerClient.get().getLatestOffsets(partitions);
+ for (KafkaPartition partition : partitions) {
+ Offsets offsets = new Offsets();
+ offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
+ if (earliestOffsetMap.containsKey(partition) &&
latestOffsetMap.containsKey(partition)) {
Review Comment:
Could you please add a comment explaining the purpose of this block?
Additionally, could you clarify why the `else` block only contains the
`contains` check and doesn't perform any action? Is this the intended behavior?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]