Copilot commented on code in PR #10595:
URL: https://github.com/apache/ozone/pull/10595#discussion_r3463317259


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java:
##########
@@ -275,7 +307,70 @@ private void addToQueue(AbstractReplicationTask task) {
             k -> new AtomicInteger()).incrementAndGet();
       }
       queuedCounter.get(task.getMetricName()).incrementAndGet();
-      executor.execute(new TaskRunner(task));
+      getExecutorForTask(task).execute(new TaskRunner(task));
+    }
+  }
+
+  private ExecutorService getExecutorForTask(AbstractReplicationTask task) {
+    cleanupFailedVolumeExecutors();
+    HddsVolume volume = task.getVolume();
+    if (volume != null) {
+      return getOrCreateVolumeExecutor(volume);
+    }
+    return executor;
+  }
+
+  private synchronized ThreadPoolExecutor getOrCreateVolumeExecutor(HddsVolume 
volume) {
+    return volumeExecutors.computeIfAbsent(volume, v -> {
+      String threadNamePrefix = context != null ? 
context.getThreadNamePrefix() : "";
+      ThreadFactory threadFactory = new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat(threadNamePrefix + "ContainerReplicationThread-" + 
v.getStorageID() + "-%d")
+          .build();
+      ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+          currentThreadCount,
+          currentThreadCount,
+          60, TimeUnit.SECONDS,
+          new PriorityBlockingQueue<>(),
+          threadFactory);
+      LOG.info("Created replication executor for volume {} with size {}", 
v.getStorageID(), currentThreadCount);
+      return tpe;
+    });
+  }
+
+  private synchronized void cleanupFailedVolumeExecutors() {
+    if (context == null) {
+      return;
+    }
+    OzoneContainer container = context.getParent().getContainer();
+    if (container == null) {
+      return;
+    }
+    MutableVolumeSet volumeSet = container.getVolumeSet();
+    if (volumeSet == null) {
+      return;
+    }
+
+    List<StorageVolume> healthyVolumes = volumeSet.getVolumesList();
+    Iterator<Map.Entry<HddsVolume, ThreadPoolExecutor>> it = 
volumeExecutors.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<HddsVolume, ThreadPoolExecutor> entry = it.next();
+      HddsVolume volume = entry.getKey();
+      if (!healthyVolumes.contains(volume)) {
+        LOG.info("Volume {} is no longer healthy/present. Shutting down its 
replication thread pool.",
+            volume.getStorageID());
+        ThreadPoolExecutor executorToShutdown = entry.getValue();
+        executorToShutdown.shutdown();
+        try {
+          if (!executorToShutdown.awaitTermination(3, TimeUnit.SECONDS)) {
+            executorToShutdown.shutdownNow();
+          }
+        } catch (InterruptedException e) {
+          executorToShutdown.shutdownNow();
+          Thread.currentThread().interrupt();
+        }
+        it.remove();

Review Comment:
   `volumeExecutors` is a `ConcurrentHashMap`, so its iterator does not support 
`Iterator.remove()` and will throw `UnsupportedOperationException` at runtime 
when a volume is removed. Remove the entry via the map instead.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java:
##########
@@ -438,6 +572,10 @@ public void run() {
         opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - 
startTime);
         inFlight.remove(task);
         decrementTaskCounter(task);
+        if (task.getStatus() == Status.QUEUED) {
+          task.updateQueuedTime();
+          scheduler.schedule(() -> addTask(task), 1, TimeUnit.SECONDS);
+        }

Review Comment:
   `scheduler.schedule(...)` can throw `RejectedExecutionException` if `stop()` 
is called while a task finishes with status `QUEUED`. Since this call is in the 
`finally` block, the exception would escape `run()` and can disrupt executor 
threads during shutdown. Guard or swallow the rejection when the supervisor is 
stopping.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java:
##########
@@ -57,8 +67,27 @@ public void copyData(long containerId, OutputStream 
destination,
           " is not found.", CONTAINER_NOT_FOUND);
     }
 
-    controller.exportContainer(
-        container.getContainerType(), containerId, destination,
-        new TarContainerPacker(compression));
+    HddsVolume volume = (HddsVolume) container.getContainerData().getVolume();
+    if (volume != null) {
+      if (volume.getActiveOutboundReplications() >=
+          config.getVolumeOutboundLimit()) {
+        LOG.info("Volume {} has reached the maximum number of concurrent " +
+                "replication reads ({})", volume.getStorageID(),
+            config.getVolumeOutboundLimit());
+        throw new StorageContainerException("Volume " + volume.getStorageID() +
+            " has reached the maximum number of concurrent replication reads ("
+            + config.getVolumeOutboundLimit() + ")", 
REPLICATION_LIMIT_REACHED);
+      }
+      volume.incActiveOutboundReplications();
+    }

Review Comment:
   The outbound replication limit check is not atomic: multiple threads can 
observe `activeOutboundReplications < limit` and all increment, exceeding the 
configured limit. Use an atomic increment-and-check pattern (and decrement on 
overflow) to enforce the limit correctly under concurrency.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java:
##########
@@ -105,6 +112,12 @@ public void processAll(ReplicationQueue queue) {
             .getMetrics().incrPendingReplicationLimitReachedTotal();
         break;
       }
+      if (reconstructionLimitReached(replicationManager)) {
+        LOG.info("The maximum number of pending reconstruction commands ({}) " 
+
+                "are scheduled. Ending the iteration.",
+            replicationManager.getReconstructionInFlightLimit());
+        break;

Review Comment:
   When the reconstruction inflight limit is reached, `processAll` breaks 
before dequeuing any items. This stalls processing of *all* under-replicated 
containers (including non-EC ones) until reconstruction work drains, which can 
delay normal replication and deletes. If the intent is to throttle only EC 
reconstruction, the limit check likely needs to be applied conditionally per 
health result (or inside `processUnderReplicatedContainer`).



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java:
##########
@@ -79,12 +86,27 @@ public Path getContainerDataFromReplicas(
       } catch (InterruptedException e) {
         logError(e, containerId, datanode, i, shuffledDatanodes.size());
         Thread.currentThread().interrupt();
+        throw new IOException("Interrupted during container download", e);
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof StatusRuntimeException &&
+            ((StatusRuntimeException) e.getCause()).getStatus().getCode() ==
+                Status.Code.RESOURCE_EXHAUSTED) {
+          resourceExhaustedCount++;
+        }
+        logError(e, containerId, datanode, i, shuffledDatanodes.size());
       } catch (Exception e) {
         logError(e, containerId, datanode, i, shuffledDatanodes.size());
       } finally {
         IOUtils.close(LOG, client);
       }
     }
+
+    if (resourceExhaustedCount > 0) {
+      throw new StorageContainerException("All sources are busy or failed " +
+          "for container " + containerId,
+          ContainerProtos.Result.REPLICATION_LIMIT_REACHED);
+    }

Review Comment:
   This throws `REPLICATION_LIMIT_REACHED` if *any* source returns 
`RESOURCE_EXHAUSTED`, even when other sources failed for different reasons (eg 
container missing, auth failure). That can incorrectly requeue replication work 
and mask real failures. Consider only mapping to `REPLICATION_LIMIT_REACHED` 
when all attempted sources were `RESOURCE_EXHAUSTED`.



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java:
##########
@@ -1801,4 +1803,70 @@ private void mockReplicationCommandCounts(
         });
   }
 
+  @Test
+  public void testInflightReconstructionLimit() throws IOException, 
NodeNotFoundException {
+    rmConf.setReconstructionGlobalLimit(2);
+    ReplicationManager rm = createReplicationManager();
+    assertEquals(2, rm.getReconstructionInFlightLimit());
+    assertEquals(0, rm.getInflightReconstructionCount());
+    assertFalse(rm.isReconstructionLimitReached());
+
+    mockReplicationCommandCounts(dn -> 0, dn -> 0);
+
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    

Review Comment:
   This line contains trailing whitespace (spaces on an otherwise blank line), 
which can trigger style checks. Please remove the whitespace.



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java:
##########
@@ -1801,4 +1803,70 @@ private void mockReplicationCommandCounts(
         });
   }
 
+  @Test
+  public void testInflightReconstructionLimit() throws IOException, 
NodeNotFoundException {
+    rmConf.setReconstructionGlobalLimit(2);
+    ReplicationManager rm = createReplicationManager();
+    assertEquals(2, rm.getReconstructionInFlightLimit());
+    assertEquals(0, rm.getInflightReconstructionCount());
+    assertFalse(rm.isReconstructionLimitReached());
+
+    mockReplicationCommandCounts(dn -> 0, dn -> 0);
+
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    
+    // Send one reconstruction command with 2 fragments
+    ReconstructECContainersCommand cmd1 = new ReconstructECContainersCommand(
+        1L, Collections.emptyList(),
+        ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(),
+            MockDatanodeDetails.randomDatanodeDetails()),
+        integers2ByteString(ImmutableList.of(1, 2)), (ECReplicationConfig) 
repConfig);
+
+    rm.sendThrottledReconstructionCommand(container, cmd1);
+    assertEquals(1, rm.getInflightReconstructionCount());
+    assertFalse(rm.isReconstructionLimitReached());
+
+    // Send another reconstruction command with 1 fragment
+    ReconstructECContainersCommand cmd2 = new ReconstructECContainersCommand(
+        2L, Collections.emptyList(),
+        ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()),
+        integers2ByteString(ImmutableList.of(3)), (ECReplicationConfig) 
repConfig);    

Review Comment:
   Trailing whitespace here can cause checkstyle / spotless failures. Please 
remove the extra spaces at the end of the line.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java:
##########
@@ -1226,4 +1230,78 @@ private void scheduleTasks(
       rs.addTask(new ReplicationTask(fromSources(i, sources), noopReplicator));
     }
   }
+
+  @ContainerLayoutTestInfo.ContainerTest
+  public void testVolumeSpecificThreadPoolAndCleanup(ContainerLayoutVersion 
layout) throws Exception {
+    this.layoutVersion = layout;
+    replicatorRef.set(slowReplicator);
+    DatanodeStateMachine stateMachine = context.getParent();
+    OzoneContainer ozoneContainer = mock(OzoneContainer.class);
+    MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
+    when(stateMachine.getContainer()).thenReturn(ozoneContainer);
+    when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet);
+
+    HddsVolume vol1 = mock(HddsVolume.class);
+    when(vol1.getStorageID()).thenReturn("vol-1");
+    HddsVolume vol2 = mock(HddsVolume.class);
+    when(vol2.getStorageID()).thenReturn("vol-2");
+
+    List<StorageVolume> healthyVolumes = new ArrayList<>();
+    healthyVolumes.add(vol1);
+    healthyVolumes.add(vol2);
+    when(volumeSet.getVolumesList()).thenReturn(healthyVolumes);
+
+    // Build supervisor with volume chooser
+    ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
+        .stateContext(context)
+        .clock(clock)
+        .volumeChooser(task -> {
+          if (task.getContainerId() == 1L || task.getContainerId() == 4L) {
+            return vol1;
+          } else if (task.getContainerId() == 2L) {
+            return vol2;
+          }
+          return null; // Fallback
+        })
+        .build();
+
+    supervisor.setReplicationMaxStreams(1);
+
+    try {
+      AbstractReplicationTask task1 = createTask(1L);
+      AbstractReplicationTask task2 = createTask(2L);
+      AbstractReplicationTask task3 = createTask(3L); // No volume
+
+      supervisor.addTask(task1);
+      supervisor.addTask(task2);
+      supervisor.addTask(task3);
+
+      // Verify tasks got correct volume assigned
+      assertEquals(vol1, task1.getVolume());
+      assertEquals(vol2, task2.getVolume());
+      assertNull(task3.getVolume());
+
+      // Now, simulate vol2 failure (remove it from healthyVolumes)
+      healthyVolumes.remove(vol2);
+
+      // Run cleanup and queue task4 on vol1 (which already has task1 running, 
so task4 will queue)
+      AbstractReplicationTask task4 = createTask(4L);
+      supervisor.addTask(task4);
+
+      // Since max streams is 1, task1 is executing and task4 is in vol1's 
executor queue.
+      // We should see a queue size of at least 1 (potentially more depending 
on fallback executor state).
+      assertTrue(supervisor.getQueueSize() >= 1);
+
+      // Thread pool for vol2 should be shut down, while vol1 remains active
+      java.lang.reflect.Field field = 
ReplicationSupervisor.class.getDeclaredField("volumeExecutors");
+      field.setAccessible(true);
+      Map<?, ?> volumeExecutors = (Map<?, ?>) field.get(supervisor);

Review Comment:
   This test uses reflection to access `ReplicationSupervisor.volumeExecutors`, 
which makes the test brittle to refactors and can break under stronger module 
encapsulation. Prefer asserting via a `@VisibleForTesting` accessor on 
`ReplicationSupervisor` (or public metrics) instead of reflective access.



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/GrpcClientMetricsInterceptor.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.ratis.thirdparty.io.grpc.CallOptions;
+import org.apache.ratis.thirdparty.io.grpc.Channel;
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import 
org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+
+/**
+ * Interceptor to capture gRPC level metrics.
+ */
+public class GrpcClientMetricsInterceptor implements ClientInterceptor {
+
+  private final XceiverClientMetrics metrics;
+
+  public GrpcClientMetricsInterceptor(XceiverClientMetrics metrics) {
+    this.metrics = metrics;
+  }

Review Comment:
   `GrpcClientMetricsInterceptor` is added but (in this PR) is not referenced 
anywhere in the codebase, so the new gRPC failure counters will never increment 
in production. Either wire the interceptor into the relevant gRPC channel/stub 
creation, or drop the interceptor until it is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to