This is an automated email from the ASF dual-hosted git repository.

striker pushed a commit to branch striker/speculative-actions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 706f4c8b23b1d5a0927e563a21b7284ed633df75
Author: Sander Striker <[email protected]>
AuthorDate: Sat Mar 21 22:33:08 2026 +0100

    speculative-actions: Deduplicate and parallelize FetchTree in prefetch
    
    Optimize _prefetch_cas_blobs to reduce remote FetchTree latency:
    
    1. Deduplicate input root digests — many subactions share the same
       input trees, so redundant FetchTree calls are eliminated.
    
    2. Issue FetchTree calls concurrently via ThreadPoolExecutor (up to
       16 workers) instead of sequentially.
    
    Individual FetchTree calls are preserved (no synthetic root) to
    maintain remote cache hit rates — input roots from actual builds
    are likely already cached on the remote.
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../queues/speculativecacheprimingqueue.py         | 29 ++++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py 
b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
index d99eba663..9550402e3 100644
--- a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
+++ b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
@@ -412,7 +412,13 @@ class SpeculativeCachePrimingQueue(Queue):
 
     @staticmethod
     def _prefetch_cas_blobs(element, spec_actions, cas, artifactcache):
-        """Pre-fetch all CAS blobs needed for instantiation."""
+        """Pre-fetch all CAS blobs needed for instantiation.
+
+        Fetches base action blobs in a single batch, then deduplicates
+        input root digests and fetches directory trees concurrently.
+        """
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
         project = element._get_project()
         _, storage_remotes = artifactcache.get_remotes(project.name, False)
         remote = storage_remotes[0] if storage_remotes else None
@@ -431,14 +437,33 @@ class SpeculativeCachePrimingQueue(Queue):
             except Exception:
                 pass
 
+        # Collect and deduplicate input root digests
+        unique_roots = {}  # hash -> digest
         for digest in base_action_digests:
             try:
                 action = cas.fetch_action(digest)
                 if action and action.HasField("input_root_digest"):
-                    cas.fetch_directory(remote, action.input_root_digest)
+                    root = action.input_root_digest
+                    if root.hash not in unique_roots:
+                        unique_roots[root.hash] = root
+            except Exception:
+                pass
+
+        if not unique_roots:
+            return
+
+        # Fetch directory trees concurrently
+        def _fetch_tree(root_digest):
+            try:
+                cas.fetch_directory(remote, root_digest)
             except Exception:
                 pass
 
+        with ThreadPoolExecutor(max_workers=min(16, len(unique_roots))) as 
pool:
+            futures = [pool.submit(_fetch_tree, d) for d in 
unique_roots.values()]
+            for f in as_completed(futures):
+                pass  # Errors handled inside _fetch_tree
+
     @staticmethod
     def _submit_action_async(exec_service, action_digest, element):
         """Submit an Execute request fire-and-forget style.

Reply via email to