This is an automated email from the ASF dual-hosted git repository.

aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 46803c39c9 fix: handle LakeFS pagination to return all results beyond 
default 100-item limit (#4349)
46803c39c9 is described below

commit 46803c39c9057029378efebfd1ce6139eed618f9
Author: Xuan Gu <[email protected]>
AuthorDate: Thu Apr 9 11:55:03 2026 -0700

    fix: handle LakeFS pagination to return all results beyond default 100-item 
limit (#4349)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    This PR fixes the issue where retrieveUncommittedObjects and
    retrieveObjectsOfVersion only returned the first page of results
    (default 100 items). It adds a `fetchAllPages` method to retrieve the
    full result set across all pages. Each request sets `.amount(1000)` to
    reduce the number of round trips when fetching paginated results.
    
    When uploading 141 files:
    | Before (only display 0-99) | After (0-140) |
    |---|---|
    | <img width="400" alt="before"
    
src="https://github.com/user-attachments/assets/184f37cc-03f0-454a-acd0-130d3668aede";
    /> | <img width="400" alt="after"
    
src="https://github.com/user-attachments/assets/79db162f-7ef7-45d4-8031-e70770ce47f1";
    /> |
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    Resolves #4343
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    Added a test that uploads 110 files to a temporary repo and verifies
    that both retrieveUncommittedObjects (before commit) and
    retrieveObjectsOfVersion (after commit) return all 110 items.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    Generated-by: Claude Opus 4.6
    
    ---------
    
    Co-authored-by: Chen Li <[email protected]>
    Co-authored-by: ali risheh <[email protected]>
---
 .../core/storage/util/LakeFSStorageClient.scala    | 45 ++++++++++++++++++----
 .../service/resource/DatasetResourceSpec.scala     | 26 +++++++++++++
 2 files changed, 64 insertions(+), 7 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
index 09fa6f3eb3..14737b9800 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
@@ -35,6 +35,9 @@ import scala.jdk.CollectionConverters._
   */
 object LakeFSStorageClient {
 
+  // Maximum number of results per LakeFS API request (pagination page size)
+  private val PageSize = 1000
+
   private lazy val apiClient: ApiClient = {
     val client = new ApiClient()
     client.setApiKey(StorageConfig.lakefsPassword)
@@ -300,8 +303,36 @@ object LakeFSStorageClient {
       .sortBy(_.getCreationDate)(Ordering[java.lang.Long].reverse) // Sort in 
descending order
   }
 
+  /**
+    * Fetches all pages from a paginated LakeFS API call.
+    *
+    * @param fetch A function that takes a pagination cursor and returns 
(results, pagination).
+    * @return All results across all pages.
+    */
+  private def fetchAllPages[T](
+      fetch: String => (java.util.List[T], Pagination)
+  ): List[T] = {
+    val allResults = scala.collection.mutable.ListBuffer[T]()
+    var hasMore = true
+    var after = "" // Pagination cursor returned by LakeFS
+
+    while (hasMore) {
+      val (results, pagination) = fetch(after)
+      allResults ++= results.asScala
+      hasMore = pagination.getHasMore
+      if (hasMore) after = pagination.getNextOffset
+    }
+
+    allResults.toList
+  }
+
   def retrieveObjectsOfVersion(repoName: String, commitHash: String): 
List[ObjectStats] = {
-    objectsApi.listObjects(repoName, 
commitHash).execute().getResults.asScala.toList
+    fetchAllPages[ObjectStats] { after =>
+      val request = objectsApi.listObjects(repoName, 
commitHash).amount(PageSize)
+      if (after.nonEmpty) request.after(after)
+      val response = request.execute()
+      (response.getResults, response.getPagination)
+    }
   }
 
   def retrieveRepositorySize(repoName: String, commitHash: String = ""): Long 
= {
@@ -334,12 +365,12 @@ object LakeFSStorageClient {
     * @return List of uncommitted object stats.
     */
   def retrieveUncommittedObjects(repoName: String): List[Diff] = {
-    branchesApi
-      .diffBranch(repoName, branchName)
-      .execute()
-      .getResults
-      .asScala
-      .toList
+    fetchAllPages[Diff] { after =>
+      val request = branchesApi.diffBranch(repoName, 
branchName).amount(PageSize)
+      if (after.nonEmpty) request.after(after)
+      val response = request.execute()
+      (response.getResults, response.getPagination)
+    }
   }
 
   def createCommit(repoName: String, branch: String, commitMessage: String): 
Commit = {
diff --git 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index a71861491a..7c94dc8fe9 100644
--- 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++ 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -2445,4 +2445,30 @@ class DatasetResourceSpec
     fetchSession(filePath) shouldBe null
     fetchPartRows(uploadId) shouldBe empty
   }
+
+  // 
===========================================================================
+  // Pagination test – verify that listing APIs return more than the default 
(100 items)
+  // 
===========================================================================
+
+  "LakeFS pagination" should "return all files when count exceeds one page for 
both uncommitted and committed objects" taggedAs Slow in {
+    val repoName =
+      
s"pagination-${System.nanoTime()}-${Random.alphanumeric.take(6).mkString.toLowerCase}"
+    LakeFSStorageClient.initRepo(repoName)
+
+    val totalFiles = 110
+    (1 to totalFiles).foreach { i =>
+      LakeFSStorageClient.writeFileToRepo(
+        repoName,
+        s"file-$i.txt",
+        new 
ByteArrayInputStream(s"content-$i".getBytes(StandardCharsets.UTF_8))
+      )
+    }
+
+    // before commit: 110 files should appear as uncommitted diffs
+    LakeFSStorageClient.retrieveUncommittedObjects(repoName).size shouldEqual 
totalFiles
+
+    // after commit: 110 files should appear as committed objects
+    val commit = LakeFSStorageClient.withCreateVersion(repoName, "commit all 
files") {}
+    LakeFSStorageClient.retrieveObjectsOfVersion(repoName, commit.getId).size 
shouldEqual totalFiles
+  }
 }

Reply via email to