[
https://issues.apache.org/jira/browse/GOBBLIN-1922?focusedWorklogId=887293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887293
]
ASF GitHub Bot logged work on GOBBLIN-1922:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Oct/23 21:23
Start Date: 25/Oct/23 21:23
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1372340213
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -257,32 +277,42 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
+ LOG.info("Discovered topic " + topic);
+ if (topic.getTopicSpecificState().isPresent()) {
+ topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new
State())
+ .addAllIfNotExist(topic.getTopicSpecificState().get());
+ }
+ Optional<Set<Integer>> partitionIDSet = Optional.absent();
+ if(filteredTopicPartition.isPresent()) {
+ List<Integer> list =
java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
+ .orElse(new ArrayList<>());
+ partitionIDSet = Optional.of(new HashSet<>(list));
+ LOG.info("Compute the workunit for topic {} with num of filtered
partitions: {}",
+ topic.getName(), list.size());
+ }
+
threadPool.submit(
new WorkUnitCreator(topic, state,
Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
- workUnits));
+ kafkaTopicWorkunitMap, partitionIDSet));
}
ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L,
TimeUnit.HOURS);
- LOG.info(String.format("Created workunits for %d topics in %d seconds",
workUnits.size(),
+ LOG.info(String.format("Created workunits for %d topics in %d seconds",
kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
// Create empty WorkUnits for skipped partitions (i.e., partitions that
have previous offsets,
// but aren't processed).
- createEmptyWorkUnitsForSkippedPartitions(workUnits,
topicSpecificStateMap, state);
- //determine the number of mappers
- int maxMapperNum =
- state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY,
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+ createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap,
topicSpecificStateMap, state);
+
KafkaWorkUnitPacker kafkaWorkUnitPacker =
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
- int numOfMultiWorkunits = maxMapperNum;
- if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
- double totalEstDataSize =
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
- LOG.info(String.format("The total estimated data size is %.2f",
totalEstDataSize));
- double targetMapperSize =
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
- numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
- numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+ int numOfMultiWorkunits = minContainer.or(0);
+ if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
+ numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
+ calculateNumMappersForPacker(state, kafkaWorkUnitPacker,
kafkaTopicWorkunitMap));
}
- addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
- List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits,
numOfMultiWorkunits);
+
+ addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap,
topicSpecificStateMap);
+ List<WorkUnit> workUnitList =
kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
Review Comment:
I'm not sure whether you can achieve the goal with this method as for the
scenario where we pack all partition in same container, normally they are
pretty light in weight.
Issue Time Tracking
-------------------
Worklog Id: (was: 887293)
Time Spent: 2h (was: 1h 50m)
> Create work units for selected topics partitions in Kafka source
> ----------------------------------------------------------------
>
> Key: GOBBLIN-1922
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1922
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-kafka
> Reporter: Hanghang Liu
> Assignee: Shirshanka Das
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Add a new feature in Kafka source to create workUnit for selected topics
> partitions. This feature can provide functionality for:
> # Recompute and split tasks(workUnit) during runtime for selected topics
> partitiions, instead of recompute for all Kafka topics
> # Make topic level replan feasible
> # One step closer to dynamic work unit allocation
> Need to have a followup to make work unit packer able to pack the recomputed
> workunits into a desired number of containers
--
This message was sent by Atlassian Jira
(v8.20.10#820010)