arpit09 commented on code in PR #4009:
URL: https://github.com/apache/gobblin/pull/4009#discussion_r1705321133
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -440,29 +440,34 @@ private int calculateNumMappersForPacker(SourceState
state,
/*
* This function need to be thread safe since it is called in the Runnable
*/
- private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState
state,
- Optional<State> topicSpecificState, Optional<Set<Integer>>
filteredPartitions) {
+ public List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState
state, Optional<State> topicSpecificState,
+ Optional<Set<Integer>> filteredPartitions) {
Timer.Context context =
this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();
- List<WorkUnit> workUnits = Lists.newArrayList();
- List<KafkaPartition> topicPartitions = topic.getPartitions();
- for (KafkaPartition partition : topicPartitions) {
- if(filteredPartitions.isPresent() &&
!filteredPartitions.get().contains(partition.getId())) {
- continue;
- }
- WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state,
topicSpecificState);
- if (workUnit != null) {
- // For disqualified topics, for each of its workunits set the high
watermark to be the same
- // as the low watermark, so that it will be skipped.
- if (!topicQualified) {
- skipWorkUnit(workUnit);
- }
- workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
- workUnits.add(workUnit);
+ final List<WorkUnit> workUnits = Lists.newArrayList();
+ final List<KafkaPartition> topicPartitions = topic.getPartitions();
+ Map<KafkaPartition, WorkUnit> workUnitMap;
+
+ if (filteredPartitions.isPresent()) {
+ LOG.info("Filtered partitions for topic {} are {}", topic.getName(),
filteredPartitions.get());
+ final List<KafkaPartition> filteredPartitionsToBeProcessed =
topicPartitions.stream()
+ .filter(partition ->
filteredPartitions.get().contains(partition.getId()))
+ .collect(Collectors.toList());
+ workUnitMap = getWorkUnits(filteredPartitionsToBeProcessed, state,
topicSpecificState);
+ } else {
+ workUnitMap = getWorkUnits(topicPartitions, state, topicSpecificState);
+ }
+
+ for (WorkUnit workUnit : workUnitMap.values()) {
Review Comment:
Moved `if(!topicQualified)` outside now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]