devmadhuu commented on code in PR #10162:
URL: https://github.com/apache/ozone/pull/10162#discussion_r3180791918


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Archiver;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
+import 
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
+import org.jooq.Cursor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages asynchronous CSV export jobs.
+ */
+@Singleton
+public class ExportJobManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExportJobManager.class);
+  private static final int MAX_QUEUE_SIZE = 4;
+  
+  private final Map<String, ExportJob> jobTracker = new ConcurrentHashMap<>();
+  private final LinkedHashMap<String, ExportJob> jobQueue = new 
LinkedHashMap<>();
+  private final Map<String, Future<?>> runningTasks = new 
ConcurrentHashMap<>();
+  private final ExecutorService workerPool;
+  private final ContainerHealthSchemaManager containerHealthSchemaManager;
+  private final String exportDirectory;
+
+  @Inject
+  public ExportJobManager(ContainerHealthSchemaManager 
containerHealthSchemaManager,
+                          OzoneConfiguration conf) {
+    this.containerHealthSchemaManager = containerHealthSchemaManager;
+    
+    // Use single thread executor for sequential processing (no concurrent DB 
access)
+    this.workerPool = Executors.newSingleThreadExecutor();
+    
+    this.exportDirectory = conf.get(
+        ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY,
+        ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY_DEFAULT);
+    
+    // Create export directory if it doesn't exist
+    try {
+      Files.createDirectories(Paths.get(exportDirectory));
+    } catch (IOException e) {
+      LOG.error("Failed to create export directory: {}", exportDirectory, e);
+    }
+    
+    LOG.info("ExportJobManager initialized with single-threaded queue (max {} 
jobs)", MAX_QUEUE_SIZE);
+  }
+
+  public synchronized String submitJob(String userId, String state, int limit, 
long prevKey) {
+    // Reject duplicate: same state already queued or running
+    boolean stateAlreadyActive = jobQueue.values().stream().anyMatch(j -> 
j.getState().equals(state)) ||
+        jobTracker.values().stream().anyMatch(j -> j.getState().equals(state) 
&& j.getStatus() == JobStatus.RUNNING);
+    if (stateAlreadyActive) {
+      throw new IllegalStateException(
+          "An export for state " + state + " is already queued or running. 
Please wait for it to complete.");
+    }
+
+    // Check global queue size limit
+    synchronized (jobQueue) {

Review Comment:
   This is very confusing. This method acquires `this` object lock, then lock 
over `jobQueue`. Any other thread that acquires jobQueue first and then tries 
to call a synchronized method creates a deadlock condition.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Archiver;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
+import 
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
+import org.jooq.Cursor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages asynchronous CSV export jobs.
+ */
+@Singleton
+public class ExportJobManager {

Review Comment:
   Add some unit tests for this class



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -253,6 +253,40 @@ public final class  ReconServerConfigKeys {
       "ozone.recon.scm.container.id.batch.size";
   public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 
1_000_000;
 
+  /**
+   * JDBC fetch size for CSV exports.
+   * Default: 10,000 rows per fetch
+   */
+  public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE =
+      "ozone.recon.unhealthy.container.fetch.size";
+  public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT = 
10_000;
+
+  /**
+   * Worker thread pool size for async CSV exports.
+   * Single-threaded to avoid concurrent database access.
+   * Default: 1
+   */
+  public static final String OZONE_RECON_EXPORT_WORKER_THREADS =
+      "ozone.recon.export.worker.threads";
+  public static final int OZONE_RECON_EXPORT_WORKER_THREADS_DEFAULT = 1;
+
+  /**
+   * Max export jobs in queue (global limit).
+   * Jobs beyond this limit will be rejected.
+   * Default: 10
+   */
+  public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL =
+      "ozone.recon.export.max.jobs.total";
+  public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 10;

Review Comment:
   Are these used anywhere ? Also contradicts with your thread queue size ?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -253,6 +253,40 @@ public final class  ReconServerConfigKeys {
       "ozone.recon.scm.container.id.batch.size";
   public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 
1_000_000;
 
+  /**
+   * JDBC fetch size for CSV exports.
+   * Default: 10,000 rows per fetch
+   */
+  public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE =
+      "ozone.recon.unhealthy.container.fetch.size";
+  public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT = 
10_000;
+
+  /**
+   * Worker thread pool size for async CSV exports.
+   * Single-threaded to avoid concurrent database access.
+   * Default: 1
+   */
+  public static final String OZONE_RECON_EXPORT_WORKER_THREADS =
+      "ozone.recon.export.worker.threads";

Review Comment:
   Is this used ?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java:
##########
@@ -395,6 +396,82 @@ public void clearAllUnhealthyContainerRecords() {
     }
   }
 
+  /**
+   * Returns a streaming cursor over unhealthy container records for a given 
state.
+   * Caller MUST close the cursor.
+   *
+   * Generated SQL example (50,000 MISSING containers, starting after 
container ID 12345):
+   *
+   * SELECT * FROM unhealthy_containers
+   * WHERE container_state = 'MISSING'
+   *   AND container_id > 12345
+   * ORDER BY container_id ASC
+   * LIMIT 50000
+   *
+   * @param state filter by state (required)
+   * @param limit max records to return, -1 = unlimited
+   * @param prevKey previous container ID to skip, for cursor-based pagination
+   * @return Cursor returning UnhealthyContainersRecord
+   */
+  /**
+   * Get the total count of unhealthy containers for a given state.
+   *
+   * @param state The container health state to filter by
+   * @param limit Maximum number of records to count (-1 for unlimited)
+   * @param prevKey Container ID offset for cursor-based pagination
+   * @return Total count of matching containers
+   */
+  public long getUnhealthyContainersCount(

Review Comment:
   Check javadoc above this method. Seems something wrong.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java:
##########
@@ -395,6 +396,82 @@ public void clearAllUnhealthyContainerRecords() {
     }
   }
 
+  /**
+   * Returns a streaming cursor over unhealthy container records for a given 
state.
+   * Caller MUST close the cursor.
+   *
+   * Generated SQL example (50,000 MISSING containers, starting after 
container ID 12345):
+   *
+   * SELECT * FROM unhealthy_containers
+   * WHERE container_state = 'MISSING'
+   *   AND container_id > 12345
+   * ORDER BY container_id ASC
+   * LIMIT 50000
+   *
+   * @param state filter by state (required)
+   * @param limit max records to return, -1 = unlimited
+   * @param prevKey previous container ID to skip, for cursor-based pagination
+   * @return Cursor returning UnhealthyContainersRecord
+   */
+  /**
+   * Get the total count of unhealthy containers for a given state.
+   *
+   * @param state The container health state to filter by
+   * @param limit Maximum number of records to count (-1 for unlimited)
+   * @param prevKey Container ID offset for cursor-based pagination
+   * @return Total count of matching containers
+   */
+  public long getUnhealthyContainersCount(
+      UnHealthyContainerStates state, int limit, long prevKey) {
+    DSLContext dslContext = containerSchemaDefinition.getDSLContext();
+    
+    Condition whereCondition = 
UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString());
+    
+    if (prevKey > 0) {
+      whereCondition = 
whereCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+    }
+    
+    long totalCount = dslContext.selectCount()
+        .from(UNHEALTHY_CONTAINERS)
+        .where(whereCondition)
+        .fetchOne(0, long.class);
+    
+    // If limit is set and less than total, return the limit as estimated total
+    if (limit > 0 && limit < totalCount) {
+      return limit;
+    }
+    
+    return totalCount;
+  }
+
+  public Cursor<UnhealthyContainersRecord> getUnhealthyContainersCursor(
+      UnHealthyContainerStates state, int limit, long prevKey) {
+    DSLContext dslContext = containerSchemaDefinition.getDSLContext();
+    SelectQuery<UnhealthyContainersRecord> query = 
dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery();
+
+    // WHERE container_state = ?
+    
query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()));
+
+    if (prevKey > 0) {
+      // AND container_id > ?  (cursor-based pagination)
+      query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+    }
+
+    // ORDER BY container_id ASC — matches composite index (state, 
container_id),
+    // so Derby walks it in order with no sort step.
+    query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc());
+
+    if (limit > 0) {
+      query.addLimit(limit);
+    }
+
+    // Controls how many rows Derby returns per JDBC round-trip.
+    // Default is 10,000 rows.
+    query.fetchSize(10000);

Review Comment:
   This is hardcoded again. In old PR , it was fixed.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Archiver;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
+import 
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
+import org.jooq.Cursor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages asynchronous CSV export jobs.
+ */
+@Singleton
+public class ExportJobManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExportJobManager.class);
+  private static final int MAX_QUEUE_SIZE = 4;

Review Comment:
   Better put a comment here, why hardcoded as 4 ?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents an asynchronous CSV export job.
+ */
+public class ExportJob {
+  
+  public enum JobStatus {
+    QUEUED,      // Waiting for worker thread
+    RUNNING,     // Actively exporting
+    COMPLETED,   // File ready for download
+    FAILED       // Error occurred
+  }
+  
+  @JsonProperty("jobId")
+  private String jobId;
+  
+  @JsonProperty("userId")
+  private String userId;
+  
+  @JsonProperty("state")
+  private String state;
+  
+  @JsonProperty("limit")
+  private int limit;
+  
+  @JsonProperty("prevKey")
+  private long prevKey;
+  
+  @JsonProperty("status")
+  private JobStatus status;
+  
+  @JsonProperty("submittedAt")
+  private long submittedAt;
+  
+  @JsonProperty("startedAt")
+  private long startedAt;
+  
+  @JsonProperty("completedAt")
+  private long completedAt;
+  
+  @JsonProperty("totalRecords")
+  private long totalRecords;
+  
+  @JsonProperty("estimatedTotal")
+  private long estimatedTotal;
+  
+  @JsonProperty("filePath")
+  private String filePath;
+  
+  @JsonProperty("errorMessage")
+  private String errorMessage;
+  
+  @JsonProperty("progressPercent")
+  private int progressPercent;
+  
+  @JsonProperty("queuePosition")
+  private int queuePosition;
+
+  public ExportJob(String jobId, String userId, String state, int limit, long 
prevKey) {
+    this.jobId = jobId;
+    this.userId = userId;
+    this.state = state;
+    this.limit = limit;
+    this.prevKey = prevKey;
+    this.status = JobStatus.QUEUED;
+    this.submittedAt = System.currentTimeMillis();
+    this.totalRecords = 0;
+    this.estimatedTotal = -1;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getState() {
+    return state;
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public long getPrevKey() {
+    return prevKey;
+  }
+
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(JobStatus status) {
+    this.status = status;
+    if (status == JobStatus.RUNNING && startedAt == 0) {
+      startedAt = System.currentTimeMillis();
+    } else if ((status == JobStatus.COMPLETED || status == JobStatus.FAILED) 
&& completedAt == 0) {
+      completedAt = System.currentTimeMillis();
+    }
+  }
+
+  public long getSubmittedAt() {
+    return submittedAt;
+  }
+
+  public long getStartedAt() {
+    return startedAt;
+  }
+
+  public long getCompletedAt() {
+    return completedAt;
+  }
+
+  public long getTotalRecords() {
+    return totalRecords;
+  }
+
+  public void setTotalRecords(long totalRecords) {
+    this.totalRecords = totalRecords;
+  }
+
+  public void incrementTotalRecords() {

Review Comment:
   Not sure, what is the purpose of this. The current code passes a local long 
`totalRecords` counter and calls `setTotalRecords` on every row. Using 
`incrementTotalRecords()` removes the local counter 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to