ZihanLi58 commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1357701681


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -268,17 +271,11 @@ public String apply(KafkaTopic topic) {
       // Create empty WorkUnits for skipped partitions (i.e., partitions that 
have previous offsets,
       // but aren't processed).
       createEmptyWorkUnitsForSkippedPartitions(workUnits, 
topicSpecificStateMap, state);
-      //determine the number of mappers
-      int maxMapperNum =
-          state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, 
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+
       KafkaWorkUnitPacker kafkaWorkUnitPacker = 
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
-      int numOfMultiWorkunits = maxMapperNum;
-      if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
-        double totalEstDataSize = 
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
-        LOG.info(String.format("The total estimated data size is %.2f", 
totalEstDataSize));
-        double targetMapperSize = 
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
-        numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
-        numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+      int numOfMultiWorkunits= 0;
+      if(!(kafkaWorkUnitPacker instanceof KafkaTopicGroupingWorkUnitPacker)) {

Review Comment:
   why do we need this check?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
     }
   }
 
+  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, 
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {

Review Comment:
   Add javadoc here?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java:
##########
@@ -169,6 +170,22 @@ default long committed(KafkaPartition partition) {
 
   public default void assignAndSeek(List<KafkaPartition> topicPartitions, 
Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }
 
+  /**
+   * Get a list of all kafka topics
+   */
+  public default List<KafkaTopic> getTopics() {return 
Collections.emptyList();};

Review Comment:
   What's the reason for moving this method here?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
     }
   }
 
+  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, 
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {

Review Comment:
   why min container is needed here and how will we determine it? 



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -886,13 +997,24 @@ private class WorkUnitCreator implements Runnable {
     private final SourceState state;
     private final Optional<State> topicSpecificState;
     private final Map<String, List<WorkUnit>> allTopicWorkUnits;
+    private final Optional<Set<Integer>> filteredPartitionsId;
 
     WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> 
topicSpecificState,
         Map<String, List<WorkUnit>> workUnits) {
       this.topic = topic;
       this.state = state;
       this.topicSpecificState = topicSpecificState;
       this.allTopicWorkUnits = workUnits;
+      this.filteredPartitionsId = Optional.absent();
+    }
+
+    WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> 
topicSpecificState,

Review Comment:
   duplicate code, extract the common part?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
     }
   }
 
+  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, 
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {
+    Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = 
Maps.newConcurrentMap();

Review Comment:
   I feel most of code here are duplicate with getWorkunits method, can we 
extract the common code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to