devmadhuu commented on code in PR #9994:
URL: https://github.com/apache/ozone/pull/9994#discussion_r3118728451
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java:
##########
@@ -450,6 +457,79 @@ private Response getUnhealthyContainersFromSchema(
return Response.ok(response).build();
}
+ /**
+ * Export all unhealthy containers into a CSV file by streaming the results
directly
+ * from the database without holding them in the JVM heap.
+ *
+ * @param state The container state to filter by, or null for all.
+ * @param limit The maximum number of records to return, 0 for unlimited.
+ * @param prevKey The previous container ID to skip, for pagination.
+ * @return {@link Response} containing the CSV StreamingOutput.
+ */
+ @GET
+ @Path("/unhealthy/export")
+ @Produces("text/csv")
+ public Response exportUnhealthyContainers(
+ @QueryParam("state") String state,
+ @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit,
Review Comment:
Below co-pilot comment is not valid from our convention. Lets use uniform as
-1 for unlimited. 0 is a valid value.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java:
##########
@@ -450,6 +457,79 @@ private Response getUnhealthyContainersFromSchema(
return Response.ok(response).build();
}
+ /**
+ * Export all unhealthy containers into a CSV file by streaming the results
directly
+ * from the database without holding them in the JVM heap.
+ *
+ * @param state The container state to filter by, or null for all.
+ * @param limit The maximum number of records to return, 0 for unlimited.
+ * @param prevKey The previous container ID to skip, for pagination.
+ * @return {@link Response} containing the CSV StreamingOutput.
+ */
+ @GET
+ @Path("/unhealthy/export")
+ @Produces("text/csv")
+ public Response exportUnhealthyContainers(
+ @QueryParam("state") String state,
+ @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit,
+ @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREVKEY) long prevKey) {
+
+ if (limit < 0) {
+ throw new WebApplicationException("The limit query parameter must be "
+ + "greater than or equal to 0.", Response.Status.BAD_REQUEST);
+ }
+
+ ContainerSchemaDefinition.UnHealthyContainerStates internalState = null;
+ if (StringUtils.isNotEmpty(state)) {
+ try {
+ internalState =
ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state);
+ } catch (IllegalArgumentException e) {
+ throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ final ContainerSchemaDefinition.UnHealthyContainerStates filterState =
internalState;
+
+ StreamingOutput stream = outputStream -> {
+ try (BufferedOutputStream bos = new BufferedOutputStream(outputStream,
256 * 1024);
+ Cursor<UnhealthyContainersRecord> cursor =
+
containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit,
prevKey)) {
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8));
+
+ // Write CSV header
+ writer.write("container_id,container_state,in_state_since," +
+ "expected_replica_count,actual_replica_count,replica_delta\n");
+
+ StringBuilder sb = new StringBuilder(128);
+ while (cursor.hasNext()) {
+ UnhealthyContainersRecord rec = cursor.fetchNext();
+ sb.setLength(0);
+ sb.append(rec.getContainerId()).append(',')
+ .append(rec.getContainerState()).append(',')
+ .append(rec.getInStateSince()).append(',')
+ .append(rec.getExpectedReplicaCount()).append(',')
+ .append(rec.getActualReplicaCount()).append(',')
+ .append(rec.getReplicaDelta()).append('\n');
+ writer.write(sb.toString());
+ }
+ writer.flush();
+ } catch (Exception e) {
+ LOG.error("Error streaming unhealthy containers CSV", e);
+ throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ };
+
+ String filename = String.format("unhealthy_containers_%s_%d.csv",
+ state != null ? state.toLowerCase() : "all",
+ System.currentTimeMillis());
+
+ return Response.ok(stream)
+ .header("Content-Type", "text/csv")
Review Comment:
No need to add content type header again, as you have already annotated the
API response with `@Produces("text/csv")`
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java:
##########
@@ -381,6 +382,73 @@ public List<UnhealthyContainerRecord>
getUnhealthyContainers(
}
}
+ /**
+ * Returns a streaming cursor over unhealthy container records.
+ * Caller MUST close the cursor.
+ *
+ * Final Complete Query Examples:
+ * Example 1: Export 50,000 MISSING containers, starting after container ID
12345
+ *
+ * SELECT * FROM unhealthy_containers
+ * WHERE container_state = 'MISSING'
+ * AND container_id > 12345
+ * ORDER BY container_id ASC
+ * LIMIT 50000
+ * JDBC will fetch 10,000 rows at a time (5 batches total)
+ *
+ * Example 2: Export all 100,000 containers (all states), no pagination
+ *
+ * SELECT * FROM unhealthy_containers
+ * ORDER BY container_state ASC, container_id ASC
+ * LIMIT 100000
+ * JDBC will fetch 10,000 rows at a time (10 batches total)
+ *
+ * @param state filter by state, or null for all states
+ * @param limit max records to return, 0 = unlimited
+ * @param prevKey previous container ID to skip, for pagination
+ * @return Cursor returning UnhealthyContainersRecord
+ */
+ public Cursor<UnhealthyContainersRecord> getUnhealthyContainersCursor(
+ UnHealthyContainerStates state, int limit, long prevKey) {
+ DSLContext dslContext = containerSchemaDefinition.getDSLContext();
+
+ // In plain SQL: SELECT * FROM unhealthy_containers
+ org.jooq.SelectQuery<UnhealthyContainersRecord> query =
dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery();
+
+ if (state != null) {
+ // Filtering by ONE specific state (e.g., state = "MISSING")
+ // WHERE container_state = 'MISSING'
+
query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()));
+ if (prevKey > 0) {
+ // AND container_id > 12345
+ query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+ }
+ // ORDER BY container_id ASC
+ query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc());
+ } else {
+ // Exporting ALL states (state = null)
+ if (prevKey > 0) {
+ // WHERE container_id > 12345
+ query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+ }
+ // ORDER BY container_state ASC, container_id ASC
+ query.addOrderBy(
+ UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc(),
+ UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()
+ );
Review Comment:
I think, this may not work correctly, lets assume that we have dataset like:
```
State | Container ID
MISSING | 1..500000
OVER_REPLICATED | 1..200000
UNDER_REPLICATED | 1..300000
```
and since you are fetching based on `limit` and `prevKey > lastBatchKey`.
Now lets assume, after batch 1 returns MISSING:1–500000, prevKey = 500000.
batch 2 sends container_id > 500000 — this silently drops all OVER_REPLICATED
and UNDER_REPLICATED containers with ID ≤ 500000, which is the entire
OVER_REPLICATED and UNDER_REPLICATED dataset in this example.
##########
hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx:
##########
@@ -175,6 +178,99 @@ const Containers: React.FC<{}> = () => {
5: mismatchedReplicaContainerData
}
+ // Mapping tab keys to the backend state parameter for CSV export
+ const tabToExportState: Record<string, string> = {
+ '1': 'MISSING',
+ '2': 'UNDER_REPLICATED',
+ '3': 'OVER_REPLICATED',
+ '4': 'MIS_REPLICATED',
+ '5': 'REPLICA_MISMATCH'
+ };
+
+ // Human-readable labels for the export tooltip
+ const tabToLabel: Record<string, string> = {
+ '1': 'Missing',
+ '2': 'Under-Replicated',
+ '3': 'Over-Replicated',
+ '4': 'Mis-Replicated',
+ '5': 'Mismatched Replicas'
+ };
+
+ const handleExportCsv = useCallback(async () => {
+ const state = tabToExportState[selectedTab];
+ const label = tabToLabel[selectedTab];
+
+ if (exportLimit > 0 && exportLimit <= 500000) {
+ const exportUrl =
`/api/v1/containers/unhealthy/export?state=${state}&limit=${exportLimit}`;
+ window.open(exportUrl, '_blank');
+ message.success(`Exporting ${label} containers as CSV (Limit:
${exportLimit})`);
+ return;
+ }
+
+ const chunkSize = 500000;
+ let prevKey = 0;
+ let part = 1;
+ let totalExported = 0;
+ const hideMessage = message.loading(`Starting export of ${label}
containers in chunks...`, 0);
+
+ try {
+ while (true) {
+ const currentLimit = exportLimit === 0 ? chunkSize :
Math.min(chunkSize, exportLimit - totalExported);
+ if (currentLimit <= 0) break;
+
+ const exportUrl =
`/api/v1/containers/unhealthy/export?state=${state}&limit=${currentLimit}&prevKey=${prevKey}`;
+ const response = await fetch(exportUrl);
+
+ if (!response.ok) {
+ throw new Error(`Failed to fetch part ${part}`);
+ }
+
+ const text = await response.text();
+ const lines = text.trim().split('\n');
+
+ // lines[0] is header. If only header, no data.
+ if (lines.length <= 1) {
+ break;
+ }
+
+ const blob = new Blob([text], { type: 'text/csv' });
+ const url = window.URL.createObjectURL(blob);
+ const a = document.createElement('a');
+ a.href = url;
+ a.download =
`unhealthy_containers_${state.toLowerCase()}_part${part}.csv`;
+ document.body.appendChild(a);
+ a.click();
+ document.body.removeChild(a);
+ window.URL.revokeObjectURL(url);
+
+ const recordsInChunk = lines.length - 1;
+ totalExported += recordsInChunk;
+
+ const lastLine = lines[lines.length - 1];
+ const lastContainerId = parseInt(lastLine.split(',')[0], 10);
Review Comment:
`text.trim() `should handle trailing \n, but if a field value (like reason,
if added) contains an embedded newline, `split('\n') `would produce incorrect
results. A CSV parser should be used rather than manual splitting
##########
hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx:
##########
@@ -175,6 +178,99 @@ const Containers: React.FC<{}> = () => {
5: mismatchedReplicaContainerData
}
+ // Mapping tab keys to the backend state parameter for CSV export
+ const tabToExportState: Record<string, string> = {
+ '1': 'MISSING',
+ '2': 'UNDER_REPLICATED',
+ '3': 'OVER_REPLICATED',
+ '4': 'MIS_REPLICATED',
+ '5': 'REPLICA_MISMATCH'
+ };
+
+ // Human-readable labels for the export tooltip
+ const tabToLabel: Record<string, string> = {
+ '1': 'Missing',
+ '2': 'Under-Replicated',
+ '3': 'Over-Replicated',
+ '4': 'Mis-Replicated',
+ '5': 'Mismatched Replicas'
+ };
+
+ const handleExportCsv = useCallback(async () => {
+ const state = tabToExportState[selectedTab];
+ const label = tabToLabel[selectedTab];
+
+ if (exportLimit > 0 && exportLimit <= 500000) {
+ const exportUrl =
`/api/v1/containers/unhealthy/export?state=${state}&limit=${exportLimit}`;
+ window.open(exportUrl, '_blank');
Review Comment:
For limit with some fixed bounded value, we are using window.open which
means opening a new window or browser tab ? Did you test , how browser behaves
with limit value ? Because you are using `fetch` for unlimited . In both cases
, we should use `fetch`.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java:
##########
@@ -450,6 +457,79 @@ private Response getUnhealthyContainersFromSchema(
return Response.ok(response).build();
}
+ /**
+ * Export all unhealthy containers into a CSV file by streaming the results
directly
+ * from the database without holding them in the JVM heap.
+ *
+ * @param state The container state to filter by, or null for all.
+ * @param limit The maximum number of records to return, 0 for unlimited.
+ * @param prevKey The previous container ID to skip, for pagination.
+ * @return {@link Response} containing the CSV StreamingOutput.
+ */
+ @GET
+ @Path("/unhealthy/export")
+ @Produces("text/csv")
+ public Response exportUnhealthyContainers(
+ @QueryParam("state") String state,
+ @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit,
+ @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREVKEY) long prevKey) {
+
+ if (limit < 0) {
+ throw new WebApplicationException("The limit query parameter must be "
+ + "greater than or equal to 0.", Response.Status.BAD_REQUEST);
+ }
+
+ ContainerSchemaDefinition.UnHealthyContainerStates internalState = null;
+ if (StringUtils.isNotEmpty(state)) {
+ try {
+ internalState =
ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state);
+ } catch (IllegalArgumentException e) {
+ throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ final ContainerSchemaDefinition.UnHealthyContainerStates filterState =
internalState;
+
+ StreamingOutput stream = outputStream -> {
+ try (BufferedOutputStream bos = new BufferedOutputStream(outputStream,
256 * 1024);
+ Cursor<UnhealthyContainersRecord> cursor =
+
containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit,
prevKey)) {
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8));
+
+ // Write CSV header
+ writer.write("container_id,container_state,in_state_since," +
+ "expected_replica_count,actual_replica_count,replica_delta\n");
+
+ StringBuilder sb = new StringBuilder(128);
+ while (cursor.hasNext()) {
+ UnhealthyContainersRecord rec = cursor.fetchNext();
+ sb.setLength(0);
+ sb.append(rec.getContainerId()).append(',')
+ .append(rec.getContainerState()).append(',')
+ .append(rec.getInStateSince()).append(',')
+ .append(rec.getExpectedReplicaCount()).append(',')
+ .append(rec.getActualReplicaCount()).append(',')
+ .append(rec.getReplicaDelta()).append('\n');
+ writer.write(sb.toString());
+ }
+ writer.flush();
+ } catch (Exception e) {
+ LOG.error("Error streaming unhealthy containers CSV", e);
+ throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
Review Comment:
This is still the issue after your above fix. The
`WebApplicationException(500)` here will never reach the client as a proper
error response. Because this is defined to throw inside lambda function.
`StreamingOutput.write()` is called by the JAX-RS container after it has
already written 200 OK and all headers to the socket. By the time this catch
block can execute, the response is committed — the 500 status cannot be sent
retroactively. The client will simply receive a silently truncated CSV with no
indication that the export failed.
I think the proper fix would be:
```
} catch (IOException e) {
// Client disconnect or network error — expected, log as warn
LOG.warn("Client disconnected during CSV export", e);
throw e;
} catch (Exception e) {
// Unexpected DB/application error
LOG.error("Error streaming unhealthy containers CSV", e);
throw new IOException("Internal error during CSV export", e);
}
```
When `write()` throws `IOException`, the JAX-RS container aborts the
connection cleanly and the browser shows a failed/incomplete download — which
is the correct and honest signal to the UI or client browser.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java:
##########
@@ -450,6 +457,79 @@ private Response getUnhealthyContainersFromSchema(
return Response.ok(response).build();
}
+ /**
+ * Export all unhealthy containers into a CSV file by streaming the results
directly
+ * from the database without holding them in the JVM heap.
+ *
+ * @param state The container state to filter by, or null for all.
+ * @param limit The maximum number of records to return, 0 for unlimited.
+ * @param prevKey The previous container ID to skip, for pagination.
+ * @return {@link Response} containing the CSV StreamingOutput.
+ */
+ @GET
+ @Path("/unhealthy/export")
+ @Produces("text/csv")
+ public Response exportUnhealthyContainers(
+ @QueryParam("state") String state,
+ @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit,
+ @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREVKEY) long prevKey) {
+
+ if (limit < 0) {
+ throw new WebApplicationException("The limit query parameter must be "
+ + "greater than or equal to 0.", Response.Status.BAD_REQUEST);
+ }
+
+ ContainerSchemaDefinition.UnHealthyContainerStates internalState = null;
+ if (StringUtils.isNotEmpty(state)) {
+ try {
+ internalState =
ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state);
+ } catch (IllegalArgumentException e) {
+ throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ final ContainerSchemaDefinition.UnHealthyContainerStates filterState =
internalState;
+
+ StreamingOutput stream = outputStream -> {
+ try (BufferedOutputStream bos = new BufferedOutputStream(outputStream,
256 * 1024);
+ Cursor<UnhealthyContainersRecord> cursor =
+
containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit,
prevKey)) {
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8));
+
+ // Write CSV header
+ writer.write("container_id,container_state,in_state_since," +
+ "expected_replica_count,actual_replica_count,replica_delta\n");
Review Comment:
Are we intentionally dropping `reason` field ?
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java:
##########
@@ -381,6 +382,73 @@ public List<UnhealthyContainerRecord>
getUnhealthyContainers(
}
}
+ /**
+ * Returns a streaming cursor over unhealthy container records.
+ * Caller MUST close the cursor.
+ *
+ * Final Complete Query Examples:
+ * Example 1: Export 50,000 MISSING containers, starting after container ID
12345
+ *
+ * SELECT * FROM unhealthy_containers
+ * WHERE container_state = 'MISSING'
+ * AND container_id > 12345
+ * ORDER BY container_id ASC
+ * LIMIT 50000
+ * JDBC will fetch 10,000 rows at a time (5 batches total)
+ *
+ * Example 2: Export all 100,000 containers (all states), no pagination
+ *
+ * SELECT * FROM unhealthy_containers
+ * ORDER BY container_state ASC, container_id ASC
+ * LIMIT 100000
+ * JDBC will fetch 10,000 rows at a time (10 batches total)
+ *
+ * @param state filter by state, or null for all states
+ * @param limit max records to return, 0 = unlimited
+ * @param prevKey previous container ID to skip, for pagination
+ * @return Cursor returning UnhealthyContainersRecord
+ */
+ public Cursor<UnhealthyContainersRecord> getUnhealthyContainersCursor(
+ UnHealthyContainerStates state, int limit, long prevKey) {
+ DSLContext dslContext = containerSchemaDefinition.getDSLContext();
+
+ // In plain SQL: SELECT * FROM unhealthy_containers
+ org.jooq.SelectQuery<UnhealthyContainersRecord> query =
dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery();
+
+ if (state != null) {
+ // Filtering by ONE specific state (e.g., state = "MISSING")
+ // WHERE container_state = 'MISSING'
+
query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()));
+ if (prevKey > 0) {
+ // AND container_id > 12345
+ query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+ }
+ // ORDER BY container_id ASC
+ query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc());
+ } else {
+ // Exporting ALL states (state = null)
+ if (prevKey > 0) {
+ // WHERE container_id > 12345
+ query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+ }
+ // ORDER BY container_state ASC, container_id ASC
+ query.addOrderBy(
+ UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc(),
+ UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()
+ );
+ }
+
+ if (limit > 0) {
+ query.addLimit(limit);
+ }
+
+ // This doesn't change the SQL query.
+ // It tells JDBC: "When you fetch data, bring me 10,000 rows at a time,
not one-by-one"
+ query.fetchSize(10000);
Review Comment:
for 10 million containers, this fetching speed could be very slow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]