This is an automated email from the ASF dual-hosted git repository. striker pushed a commit to branch striker/speculative-actions in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 9dfa676de24a9215a77038fb45a29f77cd8f8495 Author: Sander Striker <[email protected]> AuthorDate: Sat Mar 21 22:15:39 2026 +0100 speculative-actions: Cache AC results across priming passes Add per-element ActionResult cache to avoid redundant GetActionResult gRPC calls when checking ACTION overlay resolvability across background, incremental, and final priming passes. Once an ActionResult is found for an adapted action digest, it is cached and reused on subsequent passes. Negative results (action submitted but not yet complete) are NOT cached since the result may become available on the next pass. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- .../queues/speculativecacheprimingqueue.py | 41 +++++++++++++++------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py index 2cca3c6ca..d99eba663 100644 --- a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py +++ b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py @@ -174,6 +174,7 @@ class SpeculativeCachePrimingQueue(Queue): element.__priming_submitted = None element.__priming_spec_actions = None element.__priming_resolved = None + element.__priming_ac_cache = None # ----------------------------------------------------------------- # Background priming (runs in thread pool while element is PENDING) @@ -241,6 +242,9 @@ class SpeculativeCachePrimingQueue(Queue): submitted = getattr(element, "_SpeculativeCachePrimingQueue__priming_submitted", None) or set() # Per-SA resolution caches: {base_action_hash -> {target_hash -> new_digest}} resolved_caches = getattr(element, "_SpeculativeCachePrimingQueue__priming_resolved", None) or {} + # AC result cache: avoids redundant GetActionResult gRPCs across passes. + # Maps adapted_digest_hash -> ActionResult (or False for "checked, not found"). + ac_cache = getattr(element, "_SpeculativeCachePrimingQueue__priming_ac_cache", None) or {} # Pre-fetch CAS blobs only on first pass if not submitted: @@ -287,20 +291,32 @@ class SpeculativeCachePrimingQueue(Queue): if adapted is not None: # Producing action was instantiated — check if - # result is in AC - if ac_service: - try: - request = remote_execution_pb2.GetActionResultRequest( - action_digest=adapted, - ) - action_result = ac_service.GetActionResult(request) - if not action_result: - # Submitted but not yet complete — defer + # result is in AC (using cache to avoid redundant + # gRPC calls across passes) + cached_result = ac_cache.get(adapted.hash) + if cached_result is None: + # Not in cache — query AC + if ac_service: + try: + request = remote_execution_pb2.GetActionResultRequest( + action_digest=adapted, + ) + action_result = ac_service.GetActionResult(request) + if action_result: + ac_cache[adapted.hash] = action_result + else: + # Not yet complete — defer (don't cache + # negative result, it may complete later) + resolvable = False + break + except Exception: resolvable = False break - except Exception: - resolvable = False - break + elif cached_result is False: + # Previously checked and not found + resolvable = False + break + # else: cached_result is a valid ActionResult, proceed else: # Not in instantiated_actions — check if the # producing element has finished priming @@ -358,6 +374,7 @@ class SpeculativeCachePrimingQueue(Queue): element.__priming_submitted = submitted element.__priming_spec_actions = spec_actions element.__priming_resolved = resolved_caches + element.__priming_ac_cache = ac_cache # ----------------------------------------------------------------- # Final priming pass (runs as a job when element becomes READY)
