This is an automated email from the ASF dual-hosted git repository.

zitadombi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f5ed9d364b HDDS-10389. Implement a search feature for users to locate 
open keys within the Open Keys Insights section. (#6231)
f5ed9d364b is described below

commit f5ed9d364b1d31f007cfbff1e683a826c171b858
Author: Arafat2198 <[email protected]>
AuthorDate: Thu Jul 18 16:11:00 2024 +0530

    HDDS-10389. Implement a search feature for users to locate open keys within 
the Open Keys Insights section. (#6231)
---
 .../apache/hadoop/ozone/recon/ReconConstants.java  |   3 +
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  53 +-
 .../ozone/recon/api/OMDBInsightEndpoint.java       |  47 +-
 .../ozone/recon/api/OMDBInsightSearchEndpoint.java | 389 ++++++++++
 .../recon/api/TestOMDBInsightSearchEndpoint.java   | 796 +++++++++++++++++++++
 5 files changed, 1241 insertions(+), 47 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 6dbc4746ac..ed657931e0 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -43,6 +43,7 @@ public final class ReconConstants {
   public static final int DISK_USAGE_TOP_RECORDS_LIMIT = 30;
   public static final String DEFAULT_OPEN_KEY_INCLUDE_NON_FSO = "false";
   public static final String DEFAULT_OPEN_KEY_INCLUDE_FSO = "false";
+  public static final String DEFAULT_START_PREFIX = "/";
   public static final String DEFAULT_FETCH_COUNT = "1000";
   public static final String DEFAULT_KEY_SIZE = "0";
   public static final String DEFAULT_BATCH_NUMBER = "1";
@@ -50,6 +51,8 @@ public final class ReconConstants {
   public static final String RECON_QUERY_PREVKEY = "prevKey";
   public static final String RECON_OPEN_KEY_INCLUDE_NON_FSO = "includeNonFso";
   public static final String RECON_OPEN_KEY_INCLUDE_FSO = "includeFso";
+  public static final String RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT = "1000";
+  public static final String RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY = "";
   public static final String RECON_QUERY_FILTER = "missingIn";
   public static final String PREV_CONTAINER_ID_DEFAULT_VALUE = "0";
   public static final String PREV_DELETED_BLOCKS_TRANSACTION_ID_DEFAULT_VALUE =
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index e346b4bc9e..5c9f6a5f4e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -32,10 +32,11 @@ import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.TimeZone;
+import java.util.Date;
+import java.util.Set;
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -595,6 +596,54 @@ public class ReconUtils {
     }
   }
 
+  /**
+   * Finds all subdirectories under a parent directory in an FSO bucket. It 
builds
+   * a list of paths for these subdirectories. These sub-directories are then 
used
+   * to search for open files in the openFileTable.
+   *
+   * How it works:
+   * - Starts from a parent directory identified by parentId.
+   * - Looks through all child directories of this parent.
+   * - For each child, it creates a path that starts with 
volumeID/bucketID/parentId,
+   *   following our openFileTable format.
+   * - Adds these paths to a list and explores each child further for more 
subdirectories.
+   *
+   * @param parentId The ID of the parent directory from which to start 
gathering subdirectories.
+   * @param subPaths The list to which the paths of subdirectories will be 
added.
+   * @param volumeID The ID of the volume containing the parent directory.
+   * @param bucketID The ID of the bucket containing the parent directory.
+   * @param reconNamespaceSummaryManager The manager used to retrieve 
NSSummary objects.
+   * @throws IOException If an I/O error occurs while fetching NSSummary 
objects.
+   */
+  public static void gatherSubPaths(long parentId, List<String> subPaths,
+                              long volumeID, long bucketID,
+                              ReconNamespaceSummaryManager 
reconNamespaceSummaryManager)
+      throws IOException {
+    // Fetch the NSSummary object for parentId
+    NSSummary parentSummary =
+        reconNamespaceSummaryManager.getNSSummary(parentId);
+    if (parentSummary == null) {
+      return;
+    }
+
+    Set<Long> childDirIds = parentSummary.getChildDir();
+    for (Long childId : childDirIds) {
+      // Fetch the NSSummary for each child directory
+      NSSummary childSummary =
+          reconNamespaceSummaryManager.getNSSummary(childId);
+      if (childSummary != null) {
+        String subPath =
+            ReconUtils.constructObjectPathWithPrefix(volumeID, bucketID,
+                childId);
+        // Add to subPaths
+        subPaths.add(subPath);
+        // Recurse into this child directory
+        gatherSubPaths(childId, subPaths, volumeID, bucketID,
+            reconNamespaceSummaryManager);
+      }
+    }
+  }
+
   /**
    * Validates volume or bucket names according to specific rules.
    *
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index f4aaf50dfc..3f95c04fc9 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -61,7 +61,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TimeZone;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -1061,7 +1060,8 @@ public class OMDBInsightEndpoint {
       subPaths.add(startPrefixObjectPath);
 
       // Recursively gather all subpaths
-      gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]), 
Long.parseLong(names[1]));
+      ReconUtils.gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]),
+          Long.parseLong(names[1]), reconNamespaceSummaryManager);
       // Iterate over the subpaths and retrieve the files
       for (String subPath : subPaths) {
         paramInfo.setStartPrefix(subPath);
@@ -1082,49 +1082,6 @@ public class OMDBInsightEndpoint {
     return matchedKeys;
   }
 
-  /**
-   * Finds all subdirectories under a parent directory in an FSO bucket. It 
builds
-   * a list of paths for these subdirectories. These sub-directories are then 
used
-   * to search for files in the fileTable.
-   * <p>
-   * How it works:
-   * - Starts from a parent directory identified by parentId.
-   * - Looks through all child directories of this parent.
-   * - For each child, it creates a path that starts with 
volumeID/bucketID/parentId,
-   * following our fileTable format
-   * - Adds these paths to a list and explores each child further for more 
subdirectories.
-   *
-   * @param parentId The ID of the directory we start exploring from.
-   * @param subPaths A list where we collect paths to all subdirectories.
-   * @param volumeID
-   * @param bucketID
-   * @throws IOException If there are problems accessing directory information.
-   */
-  private void gatherSubPaths(long parentId, List<String> subPaths,
-                              long volumeID, long bucketID) throws IOException 
{
-    // Fetch the NSSummary object for parentId
-    NSSummary parentSummary =
-        reconNamespaceSummaryManager.getNSSummary(parentId);
-    if (parentSummary == null) {
-      return;
-    }
-
-    Set<Long> childDirIds = parentSummary.getChildDir();
-    for (Long childId : childDirIds) {
-      // Fetch the NSSummary for each child directory
-      NSSummary childSummary =
-          reconNamespaceSummaryManager.getNSSummary(childId);
-      if (childSummary != null) {
-        String subPath =
-            ReconUtils.constructObjectPathWithPrefix(volumeID, bucketID, 
childId);
-        // Add to subPaths
-        subPaths.add(subPath);
-        // Recurse into this child directory
-        gatherSubPaths(childId, subPaths, volumeID, bucketID);
-      }
-    }
-  }
-
 
   /**
    * Converts a startPrefix path into an objectId path for FSO buckets, using 
IDs.
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
new file mode 100644
index 0000000000..9cd6fa33d0
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler;
+import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo;
+import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
+import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_START_PREFIX;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY;
+import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse;
+import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse;
+import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse;
+import static 
org.apache.hadoop.ozone.recon.ReconUtils.constructObjectPathWithPrefix;
+import static org.apache.hadoop.ozone.recon.ReconUtils.validateNames;
+import static 
org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler;
+import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath;
+import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath;
+
+/**
+ * REST endpoint for search implementation in OM DB Insight.
+ */
+@Path("/keys")
+@Produces(MediaType.APPLICATION_JSON)
+@AdminOnly
+public class OMDBInsightSearchEndpoint {
+
+  private OzoneStorageContainerManager reconSCM;
+  private final ReconOMMetadataManager omMetadataManager;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMDBInsightSearchEndpoint.class);
+  private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager;
+
+
+  @Inject
+  public OMDBInsightSearchEndpoint(OzoneStorageContainerManager reconSCM,
+                                   ReconOMMetadataManager omMetadataManager,
+                                   ReconNamespaceSummaryManagerImpl 
reconNamespaceSummaryManager) {
+    this.reconSCM = reconSCM;
+    this.omMetadataManager = omMetadataManager;
+    this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
+  }
+
+
+  /**
+   * Performs a search for open keys in the Ozone Manager (OM) database using 
a specified search prefix.
+   * This endpoint searches across both File System Optimized (FSO) and Object 
Store (non-FSO) layouts,
+   * compiling a list of keys that match the given prefix along with their 
data sizes.
+   * <p>
+   * The search prefix must start from the bucket level 
('/volumeName/bucketName/') or any specific directory
+   * or key level (e.g., '/volA/bucketA/dir1' for everything under 'dir1' 
inside 'bucketA' of 'volA').
+   * The search operation matches the prefix against the start of keys' names 
within the OM DB.
+   * <p>
+   * Example Usage:
+   * 1. A startPrefix of "/volA/bucketA/" retrieves every key under bucket 
'bucketA' in volume 'volA'.
+   * 2. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' 
inside 'bucketA' of 'volA'.
+   *
+   * @param startPrefix The prefix for searching keys, starting from the 
bucket level or any specific path.
+   * @param limit       Limits the number of returned keys.
+   * @param prevKey     The key to start after for the next set of records.
+   * @return A KeyInsightInfoResponse, containing matching keys and their data 
sizes.
+   * @throws IOException On failure to access the OM database or process the 
operation.
+   * @throws IllegalArgumentException If the provided startPrefix or other 
arguments are invalid.
+   */
+  @GET
+  @Path("/open/search")
+  public Response searchOpenKeys(
+      @DefaultValue(DEFAULT_START_PREFIX) @QueryParam("startPrefix")
+      String startPrefix,
+      @DefaultValue(RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT) @QueryParam("limit")
+      int limit,
+      @DefaultValue(RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY) 
@QueryParam("prevKey") String prevKey) throws IOException {
+
+    try {
+      // Ensure startPrefix is not null or empty and starts with '/'
+      if (startPrefix == null || startPrefix.length() == 0) {
+        return createBadRequestResponse(
+            "Invalid startPrefix: Path must be at the bucket level or 
deeper.");
+      }
+      startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + 
startPrefix;
+
+      // Split the path to ensure it's at least at the bucket level
+      String[] pathComponents = startPrefix.split("/");
+      if (pathComponents.length < 3 || pathComponents[2].isEmpty()) {
+        return createBadRequestResponse(
+            "Invalid startPrefix: Path must be at the bucket level or 
deeper.");
+      }
+
+      // Ensure the limit is non-negative
+      limit = Math.max(0, limit);
+
+      // Initialize response object
+      KeyInsightInfoResponse insightResponse = new KeyInsightInfoResponse();
+      long replicatedTotal = 0;
+      long unreplicatedTotal = 0;
+      boolean keysFound = false; // Flag to track if any keys are found
+      String lastKey = null;
+
+      // Search for non-fso keys in KeyTable
+      Table<String, OmKeyInfo> openKeyTable =
+          omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY);
+      Map<String, OmKeyInfo> obsKeys =
+          retrieveKeysFromTable(openKeyTable, startPrefix, limit, prevKey);
+      for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) {
+        keysFound = true;
+        KeyEntityInfo keyEntityInfo =
+            createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue());
+        insightResponse.getNonFSOKeyInfoList()
+            .add(keyEntityInfo); // Add to non-FSO list
+        replicatedTotal += entry.getValue().getReplicatedSize();
+        unreplicatedTotal += entry.getValue().getDataSize();
+        lastKey = entry.getKey(); // Update lastKey
+      }
+
+      // Search for fso keys in FileTable
+      Map<String, OmKeyInfo> fsoKeys = searchOpenKeysInFSO(startPrefix, limit, 
prevKey);
+      for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) {
+        keysFound = true;
+        KeyEntityInfo keyEntityInfo =
+            createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue());
+        insightResponse.getFsoKeyInfoList()
+            .add(keyEntityInfo); // Add to FSO list
+        replicatedTotal += entry.getValue().getReplicatedSize();
+        unreplicatedTotal += entry.getValue().getDataSize();
+        lastKey = entry.getKey(); // Update lastKey
+      }
+
+      // If no keys were found, return a response indicating that no keys 
matched
+      if (!keysFound) {
+        return noMatchedKeysResponse(startPrefix);
+      }
+
+      // Set the aggregated totals in the response
+      insightResponse.setReplicatedDataSize(replicatedTotal);
+      insightResponse.setUnreplicatedDataSize(unreplicatedTotal);
+      insightResponse.setLastKey(lastKey);
+
+      // Return the response with the matched keys and their data sizes
+      return Response.ok(insightResponse).build();
+    } catch (IOException e) {
+      // Handle IO exceptions and return an internal server error response
+      return createInternalServerErrorResponse(
+          "Error searching open keys in OM DB: " + e.getMessage());
+    } catch (IllegalArgumentException e) {
+      // Handle illegal argument exceptions and return a bad request response
+      return createBadRequestResponse(
+          "Invalid startPrefix: " + e.getMessage());
+    }
+  }
+
+  public Map<String, OmKeyInfo> searchOpenKeysInFSO(String startPrefix,
+                                                    int limit, String prevKey)
+      throws IOException, IllegalArgumentException {
+    Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>();
+    // Convert the search prefix to an object path for FSO buckets
+    String startPrefixObjectPath = convertToObjectPath(startPrefix);
+    String[] names = parseRequestPath(startPrefixObjectPath);
+    Table<String, OmKeyInfo> openFileTable =
+        omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    // If names.length <= 2, then the search prefix is at the volume or bucket 
level hence
+    // no need to find parent or extract id's or find subpaths as the 
openFileTable is
+    // suitable for volume and bucket level search
+    if (names.length > 2 && startPrefixObjectPath.endsWith(OM_KEY_PREFIX)) {
+      // Fetch the parent ID to search for
+      long parentId = Long.parseLong(names[names.length - 1]);
+
+      // Fetch the nameSpaceSummary for the parent ID
+      NSSummary parentSummary = 
reconNamespaceSummaryManager.getNSSummary(parentId);
+      if (parentSummary == null) {
+        return matchedKeys;
+      }
+      List<String> subPaths = new ArrayList<>();
+      // Add the initial search prefix object path because it can have both 
openFiles
+      // and subdirectories with openFiles
+      subPaths.add(startPrefixObjectPath);
+
+      // Recursively gather all subpaths
+      ReconUtils.gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]), 
Long.parseLong(names[1]),
+          reconNamespaceSummaryManager);
+
+      // Iterate over the subpaths and retrieve the open files
+      for (String subPath : subPaths) {
+        matchedKeys.putAll(retrieveKeysFromTable(openFileTable, subPath, limit 
- matchedKeys.size(), prevKey));
+        if (matchedKeys.size() >= limit) {
+          break;
+        }
+      }
+      return matchedKeys;
+    }
+
+    // If the search level is at the volume, bucket or key level, directly 
search the openFileTable
+    matchedKeys.putAll(retrieveKeysFromTable(openFileTable, 
startPrefixObjectPath, limit, prevKey));
+    return matchedKeys;
+  }
+
+  /**
+   * Converts a key prefix into an object path for FSO buckets, using IDs.
+   *
+   * This method transforms a user-provided path (e.g., "volume/bucket/dir1") 
into
+   * a database-friendly format ("/volumeID/bucketID/ParentId/") by replacing 
names
+   * with their corresponding IDs. It simplifies database queries for FSO 
bucket operations.
+   *
+   * Examples:
+   * - Input: "volume/bucket/key" -> Output: 
"/volumeID/bucketID/parentDirID/key"
+   * - Input: "volume/bucket/dir1" -> Output: "/volumeID/bucketID/dir1ID/"
+   * - Input: "volume/bucket/dir1/key1" -> Output: 
"/volumeID/bucketID/dir1ID/key1"
+   * - Input: "volume/bucket/dir1/dir2" -> Output: "/volumeID/bucketID/dir2ID/"
+   *
+   * @param prevKeyPrefix The path to be converted.
+   * @return The object path as "/volumeID/bucketID/ParentId/" or an empty 
string if an error occurs.
+   * @throws IOException If database access fails.
+   * @throws IllegalArgumentException If the provided path is invalid or 
cannot be converted.
+   */
+  public String convertToObjectPath(String prevKeyPrefix) throws IOException {
+    try {
+      String[] names = parseRequestPath(normalizePath(prevKeyPrefix, 
BucketLayout.FILE_SYSTEM_OPTIMIZED));
+      Table<String, OmKeyInfo> openFileTable = 
omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+      // Root-Level: Return the original path
+      if (names.length == 0) {
+        return prevKeyPrefix;
+      }
+
+      // Volume-Level: Fetch the volumeID
+      String volumeName = names[0];
+      validateNames(volumeName);
+      String volumeKey = omMetadataManager.getVolumeKey(volumeName);
+      long volumeId = 
omMetadataManager.getVolumeTable().getSkipCache(volumeKey).getObjectID();
+      if (names.length == 1) {
+        return constructObjectPathWithPrefix(volumeId);
+      }
+
+      // Bucket-Level: Fetch the bucketID
+      String bucketName = names[1];
+      validateNames(bucketName);
+      String bucketKey = omMetadataManager.getBucketKey(volumeName, 
bucketName);
+      OmBucketInfo bucketInfo = 
omMetadataManager.getBucketTable().getSkipCache(bucketKey);
+      long bucketId = bucketInfo.getObjectID();
+      if (names.length == 2 || bucketInfo.getBucketLayout() != 
BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+        return constructObjectPathWithPrefix(volumeId, bucketId);
+      }
+
+      // Directory or Key-Level: Check both key and directory
+      BucketHandler handler =
+          getBucketHandler(reconNamespaceSummaryManager, omMetadataManager, 
reconSCM, bucketInfo);
+
+      if (names.length >= 3) {
+        String lastEntiry = names[names.length - 1];
+
+        // Check if the directory exists
+        OmDirectoryInfo dirInfo = handler.getDirInfo(names);
+        if (dirInfo != null && dirInfo.getName().equals(lastEntiry)) {
+          return constructObjectPathWithPrefix(volumeId, bucketId, 
dirInfo.getObjectID()) + OM_KEY_PREFIX;
+        }
+
+        // Check if the key exists
+        long dirID = handler.getDirObjectId(names, names.length);
+        String keyKey = constructObjectPathWithPrefix(volumeId, bucketId, 
dirID) +
+                OM_KEY_PREFIX + lastEntiry;
+        OmKeyInfo keyInfo = openFileTable.getSkipCache(keyKey);
+        if (keyInfo != null && keyInfo.getFileName().equals(lastEntiry)) {
+          return constructObjectPathWithPrefix(volumeId, bucketId,
+              keyInfo.getParentObjectID()) + OM_KEY_PREFIX + lastEntiry;
+        }
+
+        return prevKeyPrefix;
+      }
+    } catch (IllegalArgumentException e) {
+      LOG.error(
+          "IllegalArgumentException encountered while converting key prefix to 
object path: {}",
+          prevKeyPrefix, e);
+      throw e;
+    } catch (RuntimeException e) {
+      LOG.error(
+          "RuntimeException encountered while converting key prefix to object 
path: {}",
+          prevKeyPrefix, e);
+      return prevKeyPrefix;
+    }
+    return prevKeyPrefix;
+  }
+
+
+  /**
+   * Common method to retrieve keys from a table based on a search prefix and 
a limit.
+   *
+   * @param table       The table to retrieve keys from.
+   * @param startPrefix The search prefix to match keys against.
+   * @param limit       The maximum number of keys to retrieve.
+   * @param prevKey     The key to start after for the next set of records.
+   * @return A map of keys and their corresponding OmKeyInfo objects.
+   * @throws IOException If there are problems accessing the table.
+   */
+  private Map<String, OmKeyInfo> retrieveKeysFromTable(
+      Table<String, OmKeyInfo> table, String startPrefix, int limit, String 
prevKey)
+      throws IOException {
+    Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>();
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = table.iterator()) {
+      // If a previous key is provided, seek to the previous key and skip it.
+      if (!prevKey.isEmpty()) {
+        keyIter.seek(prevKey);
+        if (keyIter.hasNext()) {
+          // Skip the previous key
+          keyIter.next();
+        }
+      } else {
+        // If no previous key is provided, start from the search prefix.
+        keyIter.seek(startPrefix);
+      }
+      while (keyIter.hasNext() && matchedKeys.size() < limit) {
+        Table.KeyValue<String, OmKeyInfo> entry = keyIter.next();
+        String dbKey = entry.getKey();
+        if (!dbKey.startsWith(startPrefix)) {
+          break; // Exit the loop if the key no longer matches the prefix
+        }
+        matchedKeys.put(dbKey, entry.getValue());
+      }
+    } catch (IOException exception) {
+      LOG.error("Error retrieving keys from table for path: {}", startPrefix, 
exception);
+      throw exception;
+    }
+    return matchedKeys;
+  }
+
+  /**
+   * Creates a KeyEntityInfo object from an OmKeyInfo object and the 
corresponding key.
+   *
+   * @param dbKey   The key in the database corresponding to the OmKeyInfo 
object.
+   * @param keyInfo The OmKeyInfo object to create the KeyEntityInfo from.
+   * @return The KeyEntityInfo object created from the OmKeyInfo object and 
the key.
+   */
+  private KeyEntityInfo createKeyEntityInfoFromOmKeyInfo(String dbKey,
+                                                         OmKeyInfo keyInfo) {
+    KeyEntityInfo keyEntityInfo = new KeyEntityInfo();
+    keyEntityInfo.setKey(dbKey); // Set the DB key
+    keyEntityInfo.setPath(keyInfo.getKeyName()); // Assuming path is the same 
as key name
+    keyEntityInfo.setInStateSince(keyInfo.getCreationTime());
+    keyEntityInfo.setSize(keyInfo.getDataSize());
+    keyEntityInfo.setReplicatedSize(keyInfo.getReplicatedSize());
+    keyEntityInfo.setReplicationConfig(keyInfo.getReplicationConfig());
+    return keyEntityInfo;
+  }
+
+}
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
new file mode 100644
index 0000000000..ab16f349af
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
@@ -0,0 +1,796 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+
+import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
+import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import 
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO;
+import org.junit.jupiter.api.BeforeEach;
+
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenFileToOm;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenKeyToOm;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.recon.ReconTestInjector;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.ws.rs.core.Response;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+
+/**
+ * Test class for OMDBInsightSearchEndpoint.
+ *
+ * This class tests various scenarios for searching open keys within a
+ * given volume, bucket, and directory structure. The tests include:
+ *
+ * 1. Test Root Level Search Restriction: Ensures searching at the root level 
returns a bad request.
+ * 2. Test Volume Level Search Restriction: Ensures searching at the volume 
level returns a bad request.
+ * 3. Test Bucket Level Search: Verifies search results within different types 
of buckets (FSO, OBS, Legacy).
+ * 4. Test Directory Level Search: Validates searching inside specific 
directories.
+ * 5. Test Key Level Search: Confirms search results for specific keys within 
buckets.
+ * 6. Test Key Level Search Under Directory: Verifies searching for keys 
within nested directories.
+ * 7. Test Search Under Nested Directory: Checks search results within nested 
directories under dira3.
+ * 8. Test Limit Search: Tests the limit functionality of the search API.
+ * 9. Test Search Open Keys with Bad Request: Ensures bad requests with 
invalid parameters return appropriate responses.
+ * 10. Test Last Key in Response: Confirms the presence of the last key in 
paginated responses.
+ * 11. Test Search Open Keys with Pagination: Verifies paginated search 
results.
+ * 12. Test Search in Empty Bucket: Checks the response for searching within 
an empty bucket.
+ */
+public class TestOMDBInsightSearchEndpoint extends AbstractReconSqlDBTest {
+
+  @TempDir
+  private Path temporaryFolder;
+  private ReconOMMetadataManager reconOMMetadataManager;
+  private OMDBInsightSearchEndpoint omdbInsightSearchEndpoint;
+  private OzoneConfiguration ozoneConfiguration;
+  private static final String ROOT_PATH = "/";
+  private static final String TEST_USER = "TestUser";
+  private OMMetadataManager omMetadataManager;
+
+  private ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+        100);
+    omMetadataManager = initializeNewOmMetadataManager(
+        Files.createDirectory(temporaryFolder.resolve("JunitOmDBDir"))
+            .toFile());
+    reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
+        
Files.createDirectory(temporaryFolder.resolve("OmMetataDir")).toFile());
+
+    ReconTestInjector reconTestInjector =
+        new ReconTestInjector.Builder(temporaryFolder.toFile())
+            .withReconSqlDb()
+            .withReconOm(reconOMMetadataManager)
+            .withOmServiceProvider(mock(OzoneManagerServiceProviderImpl.class))
+            .addBinding(OzoneStorageContainerManager.class,
+                ReconStorageContainerManagerFacade.class)
+            .withContainerDB()
+            .addBinding(StorageContainerServiceProvider.class,
+                mock(StorageContainerServiceProviderImpl.class))
+            .addBinding(OMDBInsightEndpoint.class)
+            .addBinding(ContainerHealthSchemaManager.class)
+            .build();
+    reconNamespaceSummaryManager =
+        reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
+    omdbInsightSearchEndpoint = reconTestInjector.getInstance(
+        OMDBInsightSearchEndpoint.class);
+
+    // populate OM DB and reprocess into Recon RocksDB
+    populateOMDB();
+    NSSummaryTaskWithFSO nSSummaryTaskWithFso =
+        new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
+            reconOMMetadataManager, ozoneConfiguration);
+    nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
+  }
+
+  /**
+   * Create a new OM Metadata manager instance with one user, one vol, and two
+   * buckets.
+   *
+   * @throws IOException ioEx
+   */
+  private static OMMetadataManager initializeNewOmMetadataManager(
+      File omDbDir)
+      throws IOException {
+    OzoneConfiguration omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS,
+        omDbDir.getAbsolutePath());
+    OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(
+        omConfiguration, null);
+    return omMetadataManager;
+  }
+
+  @Test
+  public void testRootLevelSearchRestriction() throws IOException {
+    // Test with root level path
+    String rootPath = "/";
+    Response response = omdbInsightSearchEndpoint.searchOpenKeys(rootPath, 20, 
"");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+
+    // Test with root level path without trailing slash
+    rootPath = "";
+    response = omdbInsightSearchEndpoint.searchOpenKeys(rootPath, 20, "");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+  }
+
+  @Test
+  public void testVolumeLevelSearchRestriction() throws IOException {
+    // Test with volume level path
+    String volumePath = "/vola";
+    Response response = omdbInsightSearchEndpoint.searchOpenKeys(volumePath, 
20, "");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+
+    // Test with another volume level path
+    volumePath = "/volb";
+    response = omdbInsightSearchEndpoint.searchOpenKeys(volumePath, 20, "");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+  }
+
+  @Test
+  public void testBucketLevelSearch() throws IOException {
+    // Search inside FSO bucket
+    Response response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1", 20, "");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(14, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(14000, result.getUnreplicatedDataSize());
+    assertEquals(14000 * 3, result.getReplicatedDataSize());
+
+    // Search inside OBS bucket
+    response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1", 20, "");
+    assertEquals(200, response.getStatus());
+    result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(5, result.getNonFSOKeyInfoList().size());
+    assertEquals(0, result.getFsoKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(5000, result.getUnreplicatedDataSize());
+    assertEquals(5000 * 3, result.getReplicatedDataSize());
+
+    // Search Inside LEGACY bucket
+    response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/volc/bucketc1", 20, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(7, result.getNonFSOKeyInfoList().size());
+
+    // Test with bucket that does not exist
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/nonexistentbucket", 20, "");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testDirectoryLevelSearch() throws IOException {
+    Response response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira1", 20, 
"");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(1000, result.getUnreplicatedDataSize());
+    assertEquals(1000 * 3, result.getReplicatedDataSize());
+
+    response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira2", 20, 
"");
+    assertEquals(200, response.getStatus());
+    result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(1000, result.getUnreplicatedDataSize());
+    assertEquals(1000 * 3, result.getReplicatedDataSize());
+
+    response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira3", 20, 
"");
+    assertEquals(200, response.getStatus());
+    result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(10, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(10000, result.getUnreplicatedDataSize());
+    assertEquals(10000 * 3, result.getReplicatedDataSize());
+
+    // Test with non-existent directory
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/nonexistentdir", 20, 
"");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testKeyLevelSearch() throws IOException {
+    // FSO Bucket key-level search
+    Response response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/filea1", 10, "");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(1000, result.getUnreplicatedDataSize());
+    assertEquals(1000 * 3, result.getReplicatedDataSize());
+
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/filea2", 10, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(1000, result.getUnreplicatedDataSize());
+    assertEquals(1000 * 3, result.getReplicatedDataSize());
+
+    // OBS Bucket key-level search
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1/fileb1", 10, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(0, result.getFsoKeyInfoList().size());
+    assertEquals(1, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(1000, result.getUnreplicatedDataSize());
+    assertEquals(1000 * 3, result.getReplicatedDataSize());
+
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1/fileb2", 10, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(0, result.getFsoKeyInfoList().size());
+    assertEquals(1, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(1000, result.getUnreplicatedDataSize());
+    assertEquals(1000 * 3, result.getReplicatedDataSize());
+
+    // Test with non-existent key
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/nonexistentfile", 1, 
"");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1/nonexistentfile", 1, 
"");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  // Test searching for keys under a directory
+  @Test
+  public void testKeyLevelSearchUnderDirectory() throws IOException {
+    // FSO Bucket key-level search
+    Response response =
+        
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira1/innerfile", 10, 
"");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    response =
+        
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira2/innerfile", 10, 
"");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    // Test for unknown file in fso bucket
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira1/unknownfile", 
10, "");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+
+    // Test for unknown file in fso bucket
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira2/unknownfile", 
10, "");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+
+  @Test
+  public void testSearchUnderNestedDirectory() throws IOException {
+    Response response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira3", 20,
+        "");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(10, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    // Search under dira31
+    response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira3/dira31",
+        20, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(6, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    // Search under dira32
+    response = omdbInsightSearchEndpoint.searchOpenKeys(
+        "/vola/bucketa1/dira3/dira31/dira32", 20, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(3, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    // Search under dira33
+    response = omdbInsightSearchEndpoint.searchOpenKeys(
+        "/vola/bucketa1/dira3/dira31/dira32/dira33", 20, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    // Search for the exact file under dira33
+    response = omdbInsightSearchEndpoint.searchOpenKeys(
+        "/vola/bucketa1/dira3/dira31/dira32/dira33/file33_1", 20, "");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+
+    // Search for a non existant file under each nested directory
+    response = omdbInsightSearchEndpoint.searchOpenKeys(
+        "/vola/bucketa1/dira3/dira31/dira32/dira33/nonexistentfile", 20, "");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+
+    response = omdbInsightSearchEndpoint.searchOpenKeys(
+        "/vola/bucketa1/dira3/dira31/dira32/nonexistentfile", 20, "");
+    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testLimitSearch() throws IOException {
+    Response response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1", 2, "");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(2, result.getFsoKeyInfoList().size());
+    assertEquals(0, result.getNonFSOKeyInfoList().size());
+  }
+
+  @Test
+  public void testSearchOpenKeysWithBadRequest() throws IOException {
+    // Give a negative limit
+    int negativeLimit = -1;
+    Response response = omdbInsightSearchEndpoint.searchOpenKeys("@323232", 
negativeLimit, "");
+
+    // Then the response should indicate that the request was bad
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
+        response.getStatus(), "Expected a 400 BAD REQUEST status");
+
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+
+    response = omdbInsightSearchEndpoint.searchOpenKeys("///", 20, "");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+  }
+
+  @Test
+  public void testLastKeyInResponse() throws IOException {
+    Response response =
+        omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1", 20, "");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(0, result.getFsoKeyInfoList().size());
+    assertEquals(5, result.getNonFSOKeyInfoList().size());
+    // Assert Total Size
+    assertEquals(5000, result.getUnreplicatedDataSize());
+    assertEquals(5000 * 3, result.getReplicatedDataSize());
+    // Assert Last Key
+    assertEquals(ROOT_PATH + "volb/bucketb1/fileb5", result.getLastKey(),
+        "Expected last key to be 'fileb5'");
+  }
+
+  @Test
+  public void testSearchOpenKeysWithPagination() throws IOException {
+    // Set the initial parameters
+    String startPrefix = "/volb/bucketb1";
+    int limit = 2;
+    String prevKey = "";
+
+    // Perform the first search request
+    Response response = omdbInsightSearchEndpoint.searchOpenKeys(startPrefix, 
limit, prevKey);
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(2, result.getNonFSOKeyInfoList().size());
+    assertEquals(0, result.getFsoKeyInfoList().size());
+
+    // Extract the last key from the response
+    prevKey = result.getLastKey();
+    assertNotNull(prevKey, "Last key should not be null");
+
+    // Perform the second search request using the last key
+    response = omdbInsightSearchEndpoint.searchOpenKeys(startPrefix, limit, 
prevKey);
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(2, result.getNonFSOKeyInfoList().size());
+    assertEquals(0, result.getFsoKeyInfoList().size());
+
+    // Extract the last key from the response
+    prevKey = result.getLastKey();
+    assertNotNull(prevKey, "Last key should not be null");
+
+    // Perform the third search request using the last key
+    response = omdbInsightSearchEndpoint.searchOpenKeys(startPrefix, limit, 
prevKey);
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getNonFSOKeyInfoList().size());
+    assertEquals(0, result.getFsoKeyInfoList().size());
+    assertEquals(result.getNonFSOKeyInfoList().get(0).getKey(), 
result.getLastKey(),
+        "Expected last key to be empty");
+  }
+
+  @Test
+  public void testSearchInEmptyBucket() throws IOException {
+    // Search in empty bucket bucketb2
+    Response response = 
omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb2", 20, "");
+    assertEquals(404, response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  /**
+   * Tests the NSSummaryEndpoint for a given volume, bucket, and directory 
structure.
+   * The test setup mimics the following filesystem structure with specified 
sizes:
+   *
+   * root   (Total Size: 15000KB)
+   * ├── vola   (Total Size: 10000KB)
+   * │   ├── bucketa1   (FSO) Total Size: 5000KB
+   * │   │   ├── filea1 (Size: 1000KB)
+   * │   │   ├── filea2 (Size: 1000KB)
+   * │   │   ├── dira1 (Total Size: 1000KB)
+   * │   │   ├── dira2 (Total Size: 1000KB)
+   * │   │   └── dira3 (Total Size: 1000KB)
+   * │   │       ├── dira31 (Total Size: 1000KB)
+   * │   │       ├── dira32 (Total Size: 1000KB)
+   * │   │       └── dira33 (Total Size: 1000KB)
+   * │   ├── bucketa2   (FSO) Total Size: 5000KB
+   * │   │   ├── filea3 (Size: 1000KB)
+   * │   │   ├── filea4 (Size: 1000KB)
+   * │   │   ├── dira4 (Total Size: 1000KB)
+   * │   │   ├── dira5 (Total Size: 1000KB)
+   * │   │   └── dira6 (Total Size: 1000KB)
+   * └── volb   (Total Size: 5000KB)
+   *     ├── bucketb1   (OBS) Total Size: 5000KB
+   *     │   ├── fileb1 (Size: 1000KB)
+   *     │   ├── fileb2 (Size: 1000KB)
+   *     │   ├── fileb3 (Size: 1000KB)
+   *     │   ├── fileb4 (Size: 1000KB)
+   *     │   └── fileb5 (Size: 1000KB)
+   *     └── bucketb2   (OBS) Total Size: 0KB (Empty Bucket)
+   * └── volc   (Total Size: 7000KB)
+   *     └── bucketc1   (LEGACY) Total Size: 7000KB
+   *         ├── filec1 (Size: 1000KB)
+   *         ├── filec2 (Size: 1000KB)
+   *         ├── filec3 (Size: 1000KB)
+   *         ├── dirc1/ (Total Size: 2000KB)
+   *         └── dirc2/ (Total Size: 2000KB)
+   *
+   * @throws Exception
+   */
+  private void populateOMDB() throws Exception {
+    // Create Volumes
+    long volaObjectId = createVolume("vola");
+    createVolume("volb");
+    createVolume("volc");
+
+    // Create Buckets in vola
+    long bucketa1ObjectId =
+        createBucket("vola", "bucketa1", 1000 + 1000 + 1000 + 1000 + 1000,
+            getFSOBucketLayout());
+    long bucketa2ObjectId =
+        createBucket("vola", "bucketa2", 1000 + 1000 + 1000 + 1000 + 1000,
+            getFSOBucketLayout());
+
+    // Create Bucket in volb
+    createBucket("volb", "bucketb1", 1000 + 1000 + 1000 + 1000 + 1000,
+            getOBSBucketLayout());
+    createBucket("volb", "bucketb2", 0, getOBSBucketLayout()); // Empty Bucket
+
+    // Create Bucket in volc
+    createBucket("volc", "bucketc1", 7000,
+        getLegacyBucketLayout());
+
+    // Create Directories and Files under bucketa1
+    long dira1ObjectId =
+        createDirectory(bucketa1ObjectId, bucketa1ObjectId, volaObjectId,
+            "dira1");
+    long dira2ObjectId =
+        createDirectory(bucketa1ObjectId, bucketa1ObjectId, volaObjectId,
+            "dira2");
+    long dira3ObjectId =
+        createDirectory(bucketa1ObjectId, bucketa1ObjectId, volaObjectId,
+            "dira3");
+
+    // Create nested directories under dira3
+    long dira31ObjectId =
+        createDirectory(dira3ObjectId, bucketa1ObjectId, volaObjectId,
+            "dira31");
+    long dira32ObjectId =
+        createDirectory(dira31ObjectId, bucketa1ObjectId, volaObjectId,
+            "dira32");
+    long dira33ObjectId =
+        createDirectory(dira32ObjectId, bucketa1ObjectId, volaObjectId,
+            "dira33");
+
+    // Files directly under bucketa1
+    createOpenFile("filea1", "bucketa1", "vola", "filea1", bucketa1ObjectId,
+        bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("filea2", "bucketa1", "vola", "filea2", bucketa1ObjectId,
+        bucketa1ObjectId, volaObjectId, 1000);
+
+    // Files under dira3
+    createOpenFile("dira3/file3_1", "bucketa1", "vola", "file3_1",
+        dira3ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira3/file3_2", "bucketa1", "vola", "file3_2",
+        dira3ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira3/file3_3", "bucketa1", "vola", "file3_3",
+        dira3ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira3/file3_4", "bucketa1", "vola", "file3_4",
+        dira3ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+
+    // Files under dira31
+    createOpenFile("dira3/dira31/file31_1", "bucketa1", "vola", "file31_1",
+        dira31ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira3/dira31/file31_2", "bucketa1", "vola", "file31_2",
+        dira31ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira3/dira31/file31_3", "bucketa1", "vola", "file31_3",
+        dira31ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+
+    // Files under dira32
+    createOpenFile("dira3/dira31/dira32/file32_1", "bucketa1", "vola", 
"file32_1",
+        dira32ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira3/dira31/dira32/file32_2", "bucketa1", "vola", 
"file32_2",
+        dira32ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+
+    // Files under dira33
+    createOpenFile("dira3/dira31/dira32/dira33/file33_1", "bucketa1", "vola", 
"file33_1",
+        dira33ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+
+    // Create Directories and Files under bucketa2
+    long dira4ObjectId =
+        createDirectory(bucketa2ObjectId, bucketa2ObjectId, volaObjectId,
+            "dira4");
+    long dira5ObjectId =
+        createDirectory(bucketa2ObjectId, bucketa2ObjectId, volaObjectId,
+            "dira5");
+    long dira6ObjectId =
+        createDirectory(bucketa2ObjectId, bucketa2ObjectId, volaObjectId,
+            "dira6");
+
+    // Files directly under bucketa2
+    createOpenFile("filea3", "bucketa2", "vola", "filea3", bucketa2ObjectId,
+        bucketa2ObjectId, volaObjectId, 1000);
+    createOpenFile("filea4", "bucketa2", "vola", "filea4", bucketa2ObjectId,
+        bucketa2ObjectId, volaObjectId, 1000);
+
+    // Files directly under bucketb1
+    createOpenKey("fileb1", "bucketb1", "volb", 1000);
+    createOpenKey("fileb2", "bucketb1", "volb", 1000);
+    createOpenKey("fileb3", "bucketb1", "volb", 1000);
+    createOpenKey("fileb4", "bucketb1", "volb", 1000);
+    createOpenKey("fileb5", "bucketb1", "volb", 1000);
+
+    // Create Inner files under directories
+    createOpenFile("dira1/innerfile", "bucketa1", "vola", "innerfile",
+        dira1ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira2/innerfile", "bucketa1", "vola", "innerfile",
+        dira2ObjectId, bucketa1ObjectId, volaObjectId, 1000);
+    createOpenFile("dira4/innerfile", "bucketa2", "vola", "innerfile",
+        dira4ObjectId, bucketa2ObjectId, volaObjectId, 1000);
+    createOpenFile("dira5/innerfile", "bucketa2", "vola", "innerfile",
+        dira5ObjectId, bucketa2ObjectId, volaObjectId, 1000);
+    createOpenFile("dira6/innerfile", "bucketa2", "vola", "innerfile",
+        dira6ObjectId, bucketa2ObjectId, volaObjectId, 1000);
+
+    // Create Keys and Directories in bucketc1 (LEGACY layout)
+    createOpenKey("filec1", "bucketc1", "volc", 1000);
+    createOpenKey("filec2", "bucketc1", "volc", 1000);
+    createOpenKey("filec3", "bucketc1", "volc", 1000);
+    createOpenKey("dirc1/", "bucketc1", "volc", 2000); // Directory indicated 
by trailing slash
+    createOpenKey("dirc2/", "bucketc1", "volc", 2000); // Directory indicated 
by trailing slash
+    createOpenKey("dirc1/innerfile", "bucketc1", "volc", 2000); // File in 
directory
+    createOpenKey("dirc2/innerfile", "bucketc1", "volc", 2000); // File in 
directory
+  }
+
+  /**
+   * Create a volume and add it to the Volume Table.
+   *
+   * @return volume Object ID
+   * @throws IOException
+   */
+  private long createVolume(String volumeName) throws Exception {
+    String volumeKey = reconOMMetadataManager.getVolumeKey(volumeName);
+    long volumeId = UUID.randomUUID().getMostSignificantBits() &
+        Long.MAX_VALUE; // Generate positive ID
+    OmVolumeArgs args = OmVolumeArgs.newBuilder()
+        .setObjectID(volumeId)
+        .setVolume(volumeName)
+        .setAdminName(TEST_USER)
+        .setOwnerName(TEST_USER)
+        .build();
+
+    reconOMMetadataManager.getVolumeTable().put(volumeKey, args);
+    return volumeId;
+  }
+
+  /**
+   * Create a bucket and add it to the Bucket Table.
+   *
+   * @return bucket Object ID
+   * @throws IOException
+   */
+  private long createBucket(String volumeName, String bucketName, long 
dataSize,
+                            BucketLayout bucketLayout)
+      throws Exception {
+    String bucketKey =
+        reconOMMetadataManager.getBucketKey(volumeName, bucketName);
+    long bucketId = UUID.randomUUID().getMostSignificantBits() &
+        Long.MAX_VALUE; // Generate positive ID
+    OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setObjectID(bucketId)
+        .setBucketLayout(bucketLayout)
+        .setUsedBytes(dataSize)
+        .build();
+
+    reconOMMetadataManager.getBucketTable().put(bucketKey, bucketInfo);
+    return bucketId;
+  }
+
+  /**
+   * Create a directory and add it to the Directory Table.
+   *
+   * @return directory Object ID
+   * @throws IOException
+   */
+  private long createDirectory(long parentObjectId,
+                               long bucketObjectId,
+                               long volumeObjectId,
+                               String dirName) throws IOException {
+    long objectId = UUID.randomUUID().getMostSignificantBits() &
+        Long.MAX_VALUE; // Ensure positive ID
+    writeDirToOm(reconOMMetadataManager, objectId, parentObjectId,
+        bucketObjectId,
+        volumeObjectId, dirName);
+    return objectId;
+  }
+
+  /**
+   * Create a file and add it to the Open File Table.
+   *
+   * @return file Object ID
+   * @throws IOException
+   */
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  private long createOpenFile(String key,
+                              String bucket,
+                              String volume,
+                              String fileName,
+                              long parentObjectId,
+                              long bucketObjectId,
+                              long volumeObjectId,
+                              long dataSize) throws IOException {
+    long objectId = UUID.randomUUID().getMostSignificantBits() &
+        Long.MAX_VALUE; // Ensure positive ID
+    writeOpenFileToOm(reconOMMetadataManager, key, bucket, volume, fileName,
+        objectId, parentObjectId, bucketObjectId, volumeObjectId, null,
+        dataSize);
+    return objectId;
+  }
+
+  /**
+   * Create a key and add it to the Open Key Table.
+   *
+   * @return key Object ID
+   * @throws IOException
+   */
+  private long createOpenKey(String key,
+                             String bucket,
+                             String volume,
+                             long dataSize) throws IOException {
+    long objectId = UUID.randomUUID().getMostSignificantBits() &
+        Long.MAX_VALUE; // Ensure positive ID
+    writeOpenKeyToOm(reconOMMetadataManager, key, bucket, volume, null,
+        dataSize);
+    return objectId;
+  }
+
+  private static BucketLayout getFSOBucketLayout() {
+    return BucketLayout.FILE_SYSTEM_OPTIMIZED;
+  }
+
+  private static BucketLayout getOBSBucketLayout() {
+    return BucketLayout.OBJECT_STORE;
+  }
+
+  private static BucketLayout getLegacyBucketLayout() {
+    return BucketLayout.LEGACY;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to