Copilot commented on code in PR #10321:
URL: https://github.com/apache/ozone/pull/10321#discussion_r3343924933
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java:
##########
@@ -172,44 +179,58 @@ public String getDescription() {
@Override
public TaskResult process(
OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
- boolean anyFailure = false; // Track if any bucket fails
Map<String, Integer> updatedSeekPositions = new HashMap<>();
- // Process FSO bucket
- Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(),
0);
- Pair<Integer, Boolean> bucketResult =
nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
- updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
- if (!bucketResult.getRight()) {
- LOG.error("processWithFSO failed.");
- anyFailure = true;
- }
-
- // Process Legacy bucket
- bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
- bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events,
bucketSeek);
- updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
- if (!bucketResult.getRight()) {
- LOG.error("processWithLegacy failed.");
- anyFailure = true;
- }
-
- // Process OBS bucket
- bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
- bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
- updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
- if (!bucketResult.getRight()) {
- LOG.error("processWithOBS failed.");
- anyFailure = true;
- }
+ int fsoSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0);
+ int legacySeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(),
0);
+ int obsSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
+
+ Future<Pair<Integer, Boolean>> fsoFuture = subTaskExecutor.submit(
+ () -> nsSummaryTaskWithFSO.processWithFSO(events, fsoSeek));
+ Future<Pair<Integer, Boolean>> legacyFuture = subTaskExecutor.submit(
+ () -> nsSummaryTaskWithLegacy.processWithLegacy(events, legacySeek));
+ Future<Pair<Integer, Boolean>> obsFuture = subTaskExecutor.submit(
+ () -> nsSummaryTaskWithOBS.processWithOBS(events, obsSeek));
Review Comment:
After switching the executor to a static constant (to avoid per-staged-task
thread leaks), this submit should use the shared executor constant as well.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java:
##########
@@ -83,6 +83,13 @@ public class NSSummaryTask implements ReconOmTask {
private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy;
private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS;
+ // Shared executor for the three FSO/Legacy/OBS sub-tasks during process().
+ // The sub-tasks operate on disjoint slices of the event stream (filtered by
+ // table and bucket layout) and write to disjoint NSSummary entries, so they
+ // are safe to run in parallel.
+ private final ExecutorService subTaskExecutor = Executors.newFixedThreadPool(
+ 3, new
ThreadFactoryBuilder().setNameFormat("NSSummarySubTask-%d").setDaemon(true).build());
Review Comment:
The sub-task executor is currently created per NSSummaryTask instance and
never shut down. Recon creates staged task instances during re-initialization
(see ReconTaskControllerImpl calling getStagedTask(...)), so this will leak 3
daemon threads per reprocess run. Also, the current initializer line exceeds
the 120-char Checkstyle limit.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java:
##########
@@ -172,44 +179,58 @@ public String getDescription() {
@Override
public TaskResult process(
OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
- boolean anyFailure = false; // Track if any bucket fails
Map<String, Integer> updatedSeekPositions = new HashMap<>();
- // Process FSO bucket
- Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(),
0);
- Pair<Integer, Boolean> bucketResult =
nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
- updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
- if (!bucketResult.getRight()) {
- LOG.error("processWithFSO failed.");
- anyFailure = true;
- }
-
- // Process Legacy bucket
- bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
- bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events,
bucketSeek);
- updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
- if (!bucketResult.getRight()) {
- LOG.error("processWithLegacy failed.");
- anyFailure = true;
- }
-
- // Process OBS bucket
- bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
- bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
- updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
- if (!bucketResult.getRight()) {
- LOG.error("processWithOBS failed.");
- anyFailure = true;
- }
+ int fsoSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0);
+ int legacySeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(),
0);
+ int obsSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
+
+ Future<Pair<Integer, Boolean>> fsoFuture = subTaskExecutor.submit(
+ () -> nsSummaryTaskWithFSO.processWithFSO(events, fsoSeek));
+ Future<Pair<Integer, Boolean>> legacyFuture = subTaskExecutor.submit(
+ () -> nsSummaryTaskWithLegacy.processWithLegacy(events, legacySeek));
Review Comment:
After switching the executor to a static constant (to avoid per-staged-task
thread leaks), these submits should use the new shared executor constant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]