rakeshadr commented on code in PR #10497:
URL: https://github.com/apache/ozone/pull/10497#discussion_r3435171838
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -208,11 +213,45 @@ public boolean checkSpaceAndRecordAllocation(DatanodeInfo
datanodeInfo, Containe
}
/**
- * Remove a pending container allocation from a specific DataNode.
- * Removes from both current and previous windows.
- * Called when container is confirmed.
+ * Returns true if the given datanode has at least one allocatable container
slot
+ * available, accounting for pending in-flight allocations.
+ *
+ * <p>Slot availability is based on {@code maxContainerSize}: a slot exists
for each
+ * {@code maxContainerSize}-worth of usable space on any volume. This
read-only check
+ * is intended for the placement policy and does not consume a slot.
+ *
+ * @param datanodeInfo the datanode to check
+ * @return true if at least one container slot is available
+ */
+ public boolean hasAvailableSpace(DatanodeInfo datanodeInfo) {
+ Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
+ List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
+ if (storageReports.isEmpty()) {
+ return false;
+ }
+ TwoWindowBucket bucket = datanodeInfo.getPendingContainerAllocations();
+ bucket.rollIfNeeded();
+ final int pendingCount = bucket.getCount();
+ long allocatableCount = 0;
+ for (StorageReportProto report : storageReports) {
+ allocatableCount += Math.max(0L, VolumeUsage.getUsableSpace(report)) /
maxContainerSize;
Review Comment:
Defensive coding: explicitly skip failed volume.
IIUC, when a volume is failed DN sends remaining=0, committed=0,
freeSpaceToSpare=0. That makes VolumeUsage.getUsableSpace(report) return 0 - 0
- 0 = 0, then Math.max(0L, 0) / maxContainerSize = 0
But its good to safe-guard it explicitly. Say, if there's ever a DN-side
regression where stale non-zero `remaining` value comes, then it would give
false positive.
```
if (report.hasFailed() && report.getFailed()) {
continue;
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -156,7 +160,8 @@ synchronized boolean checkSpaceAndAdd(
final int pendingAllocationCount = getCount();
long allocatableCount = 0;
for (StorageReportProto report : storageReports) {
- final long allocatableCountOnThisDisk =
VolumeUsage.getUsableSpace(report) / maxContainerSize;
+ final long allocatableCountOnThisDisk =
Review Comment:
Same
[comment](https://github.com/apache/ozone/pull/10497/changes#r3435171838) here
as well.
Defensive coding: explicitly skip failed volume.
```
if (report.hasFailed() && report.getFailed()) {
continue;
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -105,7 +105,11 @@ synchronized void rollIfNeeded() {
previousWindow.clear();
currentWindow.clear();
lastRollTime = now;
- LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending
containers", elapsed, dropped);
+ if (dropped > 0) {
+ LOG.warn("PendingContainerTracker: force-dropped {} unconfirmed
pending containers "
+ + "on DN {} after {}ms (2x rollInterval). "
+ + "Container reports may have been lost.", dropped, datanodeID,
elapsed);
Review Comment:
Can we add metrics as well. Its good to monitor the case of lost reports -
"containers dropped after 2× roll."
```
if (metrics != null) {
metrics.incNumPendingContainersForceDropped(dropped);
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -208,11 +213,45 @@ public boolean checkSpaceAndRecordAllocation(DatanodeInfo
datanodeInfo, Containe
}
/**
- * Remove a pending container allocation from a specific DataNode.
- * Removes from both current and previous windows.
- * Called when container is confirmed.
+ * Returns true if the given datanode has at least one allocatable container
slot
+ * available, accounting for pending in-flight allocations.
+ *
+ * <p>Slot availability is based on {@code maxContainerSize}: a slot exists
for each
+ * {@code maxContainerSize}-worth of usable space on any volume. This
read-only check
+ * is intended for the placement policy and does not consume a slot.
Review Comment:
Modify javadoc:
`This read-only check is intended for the placement policy and does not
consume a slot.`
to
`This check is intended for the placement policy. This rolls expired-window
entries but does not consume a slot.`
##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java:
##########
@@ -461,6 +461,14 @@ public boolean checkSpaceAndRecordAllocation(DatanodeInfo
datanodeInfo, Containe
return pendingContainerTracker.checkSpaceAndRecordAllocation(datanodeInfo,
containerID);
}
+ @Override
+ public boolean hasAvailableSpace(DatanodeInfo datanodeInfo) {
+ if (datanodeInfo == null) {
Review Comment:
Please remove this null check here.
_PendingContainerTracker#hasAvailableSpace()_ already has NULL check:
`Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");`
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java:
##########
Review Comment:
Add a metric `"totalPendingContainerSlots"`: real-time in-flight count
cluster-wide.
Sample code snippet:
```
private void nodeSpaceStatistics(Map<String, String> nodeStatics) {
if (nodeStateManager.getAllNodes().isEmpty()) {
return;
}
long capacityByte = 0;
long scmUsedByte = 0;
long remainingByte = 0;
long totalPending = 0; // add
for (DatanodeInfo dni : nodeStateManager.getAllNodes()) {
totalPending += dni.getPendingContainerAllocations().getCount(); //
add
List<StorageReportProto> storageReports = dni.getStorageReports();
if (storageReports != null && !storageReports.isEmpty()) {
for (StorageReportProto storageReport : storageReports) {
capacityByte += storageReport.getCapacity();
scmUsedByte += storageReport.getScmUsed();
remainingByte += storageReport.getRemaining();
}
}
}
metrics.setTotalPendingContainerSlots(totalPending); // add
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -208,12 +208,38 @@ public boolean checkSpaceAndRecordAllocation(DatanodeInfo
datanodeInfo, Containe
}
/**
- * Remove a pending container allocation from a specific DataNode.
- * Removes from both current and previous windows.
- * Called when container is confirmed.
+ * Returns true if the given datanode has at least one allocatable container
slot
+ * available, accounting for pending in-flight allocations.
*
- * @param containerID The container to remove from pending
+ * Uses the same slot-counting logic as {@link
TwoWindowBucket#checkSpaceAndAdd}
+ *
+ * <p>This method does not consume a slot — it is a read-only check intended
+ * for the placement policy
+ *
+ * @param datanodeInfo the datanode to check
+ * @return true if at least one container slot is available
*/
+ public boolean hasAvailableSpace(DatanodeInfo datanodeInfo) {
+ Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
+ List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
+ if (storageReports.isEmpty()) {
+ return false;
+ }
+ TwoWindowBucket bucket = datanodeInfo.getPendingContainerAllocations();
+ bucket.rollIfNeeded();
+ final int pendingCount = bucket.getCount();
+ long allocatableCount = 0;
+ for (StorageReportProto report : storageReports) {
+ allocatableCount += VolumeUsage.getUsableSpace(report) /
maxContainerSize;
+ if (allocatableCount > pendingCount) {
+ return true;
+ }
+ }
+ LOG.debug("Datanode {} has no available container slots. Pending: {},
Allocatable: {}",
+ datanodeInfo.getID(), pendingCount, allocatableCount);
+ return false;
Review Comment:
Oops I missed one case, we've already captured this metrics in
[PendingContainerTracker#checkSpaceAndRecordAllocation()](https://github.com/ashishkumar50/ozone/blob/master/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java#L204)
function. The metric should only fire in `checkSpaceAndRecordAllocation`,
where the actual allocation attempt occurs.
Semantics: "we tried to place a container on this DN and skipped it because
it was full."
Can you please revert this change, by removing block
`metrics.incNumSkippedFullNodeContainerAllocation();` from hasAvailableSpace()
function. Thanks!
##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java:
##########
@@ -376,6 +376,40 @@ public void testMultiVolumeWithCommittedBytes() {
assertFalse(tracker.checkSpaceAndRecordAllocation(dnInfo,
containers.get(1)));
}
+ /**
+ * Pending in-flight replications recorded via checkSpaceAndRecordAllocation
count against
+ * slots, same as write-path containers. hasAvailableSpace reflects the
combined total.
+ */
Review Comment:
Add missing tests related to tracker metrics:-
Note: LLM generated, please use this as reference code and validate before
adding it.
```
private StorageReportProto createFailedStorageReport(DatanodeInfo dn) {
return HddsTestUtils.createStorageReport(dn.getID(), "/data/failed",
0, 0, 0, StorageTypeProto.DISK, true);
}
private PendingContainerTracker trackerWithMetrics(SCMNodeMetrics metrics) {
return new PendingContainerTracker(MAX_CONTAINER_SIZE,
HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, metrics);
}
/**
* hasAvailableSpace is a read-only advisory check called by placement policy
* for every candidate DN. Firing numSkippedFullNodeContainerAllocation there
* would inflate the counter O(healthyNodes) per container allocation.
* Only checkSpaceAndRecordAllocation (the actual gate) should fire it.
*/
@Test
public void
testSkippedMetricFiresOnlyFromActualAllocationNotFromReadOnlyCheck() {
SCMNodeMetrics metrics = mock(SCMNodeMetrics.class);
PendingContainerTracker t = trackerWithMetrics(metrics);
// fill dn1 to capacity: 1 slot, consume it
dn1.updateStorageReports(List.of(
createStorageReport(dn1, 10 * MAX_CONTAINER_SIZE, MAX_CONTAINER_SIZE,
0)));
t.checkSpaceAndRecordAllocation(dn1, container1); // slot consumed
clearInvocations(metrics);
// read-only check on a full DN — must NOT fire the metric
assertFalse(t.hasAvailableSpace(dn1));
verify(metrics, never()).incNumSkippedFullNodeContainerAllocation();
// actual allocation attempt on a full DN — MUST fire the metric
assertFalse(t.checkSpaceAndRecordAllocation(dn1, container2));
verify(metrics, times(1)).incNumSkippedFullNodeContainerAllocation();
}
@Test
public void testAddedAndRemovedMetricsFireCorrectly() {
SCMNodeMetrics metrics = mock(SCMNodeMetrics.class);
PendingContainerTracker t = trackerWithMetrics(metrics);
assertTrue(t.checkSpaceAndRecordAllocation(dn1, container1));
verify(metrics, times(1)).incNumPendingContainersAdded();
verify(metrics, never()).incNumSkippedFullNodeContainerAllocation();
assertTrue(t.checkSpaceAndRecordAllocation(dn1, container2));
verify(metrics, times(2)).incNumPendingContainersAdded();
t.removePendingAllocation(dn1.getPendingContainerAllocations(),
container1);
verify(metrics, times(1)).incNumPendingContainersRemoved();
// removing non-existent — must NOT fire removed metric again
t.removePendingAllocation(dn1.getPendingContainerAllocations(),
container1);
verify(metrics, times(1)).incNumPendingContainersRemoved();
}
@Test
@Timeout(10)
public void testForceDropMetricFiresOnDoubleRollInterval() throws
InterruptedException {
long rollMs = 100L;
SCMNodeMetrics metrics = mock(SCMNodeMetrics.class);
DatanodeInfo dn = new DatanodeInfo(
MockDatanodeDetails.randomLocalDatanodeDetails(),
NodeStatus.inServiceHealthy(),
null, rollMs);
setupDefaultStorageReport(dn);
PendingContainerTracker t = new
PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, metrics);
assertTrue(t.checkSpaceAndRecordAllocation(dn, container1));
assertTrue(t.checkSpaceAndRecordAllocation(dn, container2));
assertEquals(2, dn.getPendingContainerAllocations().getCount());
Thread.sleep(2 * rollMs + 50); // skip past double interval — triggers
force-drop branch
assertFalse(t.hasAvailableSpace(dn)); // triggers rollIfNeeded → force-drop
assertEquals(0, dn.getPendingContainerAllocations().getCount());
verify(metrics, times(1)).incNumPendingContainersForceDropped(2);
verify(metrics, never()).incNumPendingContainersAgedOut(anyInt());
}
@Test
@Timeout(10)
public void testAgedOutMetricFiresOnNormalRoll() throws InterruptedException
{
long rollMs = 100L;
SCMNodeMetrics metrics = mock(SCMNodeMetrics.class);
DatanodeInfo dn = new DatanodeInfo(
MockDatanodeDetails.randomLocalDatanodeDetails(),
NodeStatus.inServiceHealthy(),
null, rollMs);
setupDefaultStorageReport(dn);
PendingContainerTracker t = new
PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, metrics);
assertTrue(t.checkSpaceAndRecordAllocation(dn, container1)); // goes into
currentWindow
assertEquals(1, dn.getPendingContainerAllocations().getCount());
Thread.sleep(rollMs + 50); // single roll: container1 moves to
previousWindow
// trigger roll via hasAvailableSpace (or getPendingContainerAllocations
which rolls)
t.hasAvailableSpace(dn);
assertEquals(1, dn.getPendingContainerAllocations().getCount()); // still
visible in previousWindow
Thread.sleep(rollMs + 50); // second roll: previousWindow (holding
container1) cleared
t.hasAvailableSpace(dn);
assertEquals(0, dn.getPendingContainerAllocations().getCount());
verify(metrics, times(1)).incNumPendingContainersAgedOut(1);
verify(metrics, never()).incNumPendingContainersForceDropped(anyInt());
}
@Test
public void testRemovalFreesSlotForNextAllocation() {
dn1.updateStorageReports(List.of(
createStorageReport(dn1, 10 * MAX_CONTAINER_SIZE, MAX_CONTAINER_SIZE,
0))); // 1 slot
assertTrue(tracker.checkSpaceAndRecordAllocation(dn1, container1));
assertFalse(tracker.checkSpaceAndRecordAllocation(dn1, container2)); //
slot full
tracker.removePendingAllocation(dn1.getPendingContainerAllocations(),
container1);
assertTrue(tracker.hasAvailableSpace(dn1)); //
slot freed
assertTrue(tracker.checkSpaceAndRecordAllocation(dn1, container2)); //
succeeds now
}
```
##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java:
##########
@@ -376,6 +376,40 @@ public void testMultiVolumeWithCommittedBytes() {
assertFalse(tracker.checkSpaceAndRecordAllocation(dnInfo,
containers.get(1)));
}
+ /**
Review Comment:
can we add below scenarios to improve coverage:
_Note: LLM generated, please use this as reference code and validate before
adding it._
```
/**
* A failed volume has remaining=0 by DN convention, but the tracker should
* explicitly skip it (report.getFailed() == true) so a stale non-zero
* remaining value on a failed volume can never grant spurious slots.
*/
@Test
public void testFailedVolumeNotCountedAsAllocatableSlot() {
StorageReportProto failed = createFailedStorageReport(dn1);
StorageReportProto healthy = createStorageReport(dn1,
10 * MAX_CONTAINER_SIZE, MAX_CONTAINER_SIZE, 0); // 1 real slot
dn1.updateStorageReports(List.of(failed, healthy));
assertTrue(tracker.hasAvailableSpace(dn1)); //
healthy vol → 1 slot
assertTrue(tracker.checkSpaceAndRecordAllocation(dn1, container1)); //
consumes it
assertFalse(tracker.hasAvailableSpace(dn1)); // 0
slots left
assertFalse(tracker.checkSpaceAndRecordAllocation(dn1, container2)); //
rejected
}
@Test
public void testAllVolumesFailedReturnsFalse() {
dn1.updateStorageReports(List.of(
createFailedStorageReport(dn1),
createFailedStorageReport(dn1)));
assertFalse(tracker.hasAvailableSpace(dn1));
assertFalse(tracker.checkSpaceAndRecordAllocation(dn1, container1));
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -105,7 +105,11 @@ synchronized void rollIfNeeded() {
previousWindow.clear();
currentWindow.clear();
lastRollTime = now;
- LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending
containers", elapsed, dropped);
+ if (dropped > 0) {
+ LOG.warn("PendingContainerTracker: force-dropped {} unconfirmed
pending containers "
+ + "on DN {} after {}ms (2x rollInterval). "
+ + "Container reports may have been lost.", dropped, datanodeID,
elapsed);
+ }
} else if (elapsed >= rollIntervalMs) {
previousWindow.clear();
Review Comment:
Can we add metrics. Containers aged out by normal roll.
```
int agedOut = previousWindow.size();
previousWindow.clear();
...
...
...
lastRollTime = now;
if (agedOut > 0 && metrics != null) {
metrics.incNumPendingContainersAgedOut(agedOut);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]