This is an automated email from the ASF dual-hosted git repository.
aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 46803c39c9 fix: handle LakeFS pagination to return all results beyond
default 100-item limit (#4349)
46803c39c9 is described below
commit 46803c39c9057029378efebfd1ce6139eed618f9
Author: Xuan Gu <[email protected]>
AuthorDate: Thu Apr 9 11:55:03 2026 -0700
fix: handle LakeFS pagination to return all results beyond default 100-item
limit (#4349)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR fixes the issue where retrieveUncommittedObjects and
retrieveObjectsOfVersion only returned the first page of results
(default 100 items). It adds a `fetchAllPages` method to retrieve the
full result set across all pages. Each request sets `.amount(1000)` to
reduce the number of round trips when fetching paginated results.
When uploading 141 files:
| Before (only display 0-99) | After (0-140) |
|---|---|
| <img width="400" alt="before"
src="https://github.com/user-attachments/assets/184f37cc-03f0-454a-acd0-130d3668aede"
/> | <img width="400" alt="after"
src="https://github.com/user-attachments/assets/79db162f-7ef7-45d4-8031-e70770ce47f1"
/> |
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Resolves #4343
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Added a test that uploads 110 files to a temporary repo and verifies
that both retrieveUncommittedObjects (before commit) and
retrieveObjectsOfVersion (after commit) return all 110 items.
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
Generated-by: Claude Opus 4.6
---------
Co-authored-by: Chen Li <[email protected]>
Co-authored-by: ali risheh <[email protected]>
---
.../core/storage/util/LakeFSStorageClient.scala | 45 ++++++++++++++++++----
.../service/resource/DatasetResourceSpec.scala | 26 +++++++++++++
2 files changed, 64 insertions(+), 7 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
index 09fa6f3eb3..14737b9800 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
@@ -35,6 +35,9 @@ import scala.jdk.CollectionConverters._
*/
object LakeFSStorageClient {
+ // Maximum number of results per LakeFS API request (pagination page size)
+ private val PageSize = 1000
+
private lazy val apiClient: ApiClient = {
val client = new ApiClient()
client.setApiKey(StorageConfig.lakefsPassword)
@@ -300,8 +303,36 @@ object LakeFSStorageClient {
.sortBy(_.getCreationDate)(Ordering[java.lang.Long].reverse) // Sort in
descending order
}
+ /**
+ * Fetches all pages from a paginated LakeFS API call.
+ *
+ * @param fetch A function that takes a pagination cursor and returns
(results, pagination).
+ * @return All results across all pages.
+ */
+ private def fetchAllPages[T](
+ fetch: String => (java.util.List[T], Pagination)
+ ): List[T] = {
+ val allResults = scala.collection.mutable.ListBuffer[T]()
+ var hasMore = true
+ var after = "" // Pagination cursor returned by LakeFS
+
+ while (hasMore) {
+ val (results, pagination) = fetch(after)
+ allResults ++= results.asScala
+ hasMore = pagination.getHasMore
+ if (hasMore) after = pagination.getNextOffset
+ }
+
+ allResults.toList
+ }
+
def retrieveObjectsOfVersion(repoName: String, commitHash: String):
List[ObjectStats] = {
- objectsApi.listObjects(repoName,
commitHash).execute().getResults.asScala.toList
+ fetchAllPages[ObjectStats] { after =>
+ val request = objectsApi.listObjects(repoName,
commitHash).amount(PageSize)
+ if (after.nonEmpty) request.after(after)
+ val response = request.execute()
+ (response.getResults, response.getPagination)
+ }
}
def retrieveRepositorySize(repoName: String, commitHash: String = ""): Long
= {
@@ -334,12 +365,12 @@ object LakeFSStorageClient {
* @return List of uncommitted object stats.
*/
def retrieveUncommittedObjects(repoName: String): List[Diff] = {
- branchesApi
- .diffBranch(repoName, branchName)
- .execute()
- .getResults
- .asScala
- .toList
+ fetchAllPages[Diff] { after =>
+ val request = branchesApi.diffBranch(repoName,
branchName).amount(PageSize)
+ if (after.nonEmpty) request.after(after)
+ val response = request.execute()
+ (response.getResults, response.getPagination)
+ }
}
def createCommit(repoName: String, branch: String, commitMessage: String):
Commit = {
diff --git
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index a71861491a..7c94dc8fe9 100644
---
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -2445,4 +2445,30 @@ class DatasetResourceSpec
fetchSession(filePath) shouldBe null
fetchPartRows(uploadId) shouldBe empty
}
+
+ //
===========================================================================
+ // Pagination test – verify that listing APIs return more than the default
(100 items)
+ //
===========================================================================
+
+ "LakeFS pagination" should "return all files when count exceeds one page for
both uncommitted and committed objects" taggedAs Slow in {
+ val repoName =
+
s"pagination-${System.nanoTime()}-${Random.alphanumeric.take(6).mkString.toLowerCase}"
+ LakeFSStorageClient.initRepo(repoName)
+
+ val totalFiles = 110
+ (1 to totalFiles).foreach { i =>
+ LakeFSStorageClient.writeFileToRepo(
+ repoName,
+ s"file-$i.txt",
+ new
ByteArrayInputStream(s"content-$i".getBytes(StandardCharsets.UTF_8))
+ )
+ }
+
+ // before commit: 110 files should appear as uncommitted diffs
+ LakeFSStorageClient.retrieveUncommittedObjects(repoName).size shouldEqual
totalFiles
+
+ // after commit: 110 files should appear as committed objects
+ val commit = LakeFSStorageClient.withCreateVersion(repoName, "commit all
files") {}
+ LakeFSStorageClient.retrieveObjectsOfVersion(repoName, commit.getId).size
shouldEqual totalFiles
+ }
}