This is an automated email from the ASF dual-hosted git repository. striker pushed a commit to branch striker/speculative-actions in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 9b68cdef7e9925919964445d62701948ec3fa88a Author: Sander Striker <[email protected]> AuthorDate: Mon Mar 16 18:28:12 2026 +0100 speculative actions: Add scheduler queues and pipeline wiring Scheduler queues: - SpeculativeActionGenerationQueue: runs after BuildQueue, extracts subaction digests, generates overlays, stores SA by weak key - SpeculativeCachePrimingQueue: runs after PullQueue, retrieves stored SA, instantiates with current dep digests, submits Execute to buildbox-casd's local execution scheduler for verified caching Pipeline wiring: - _stream.py: conditionally adds queues when speculative-actions enabled - _context.py: reads speculative-actions scheduler config flag - element.py: _get_weak_cache_key(), subaction digest storage, _assemble() transfers digests from sandbox after build - _artifactcache.py: store/get_speculative_actions() with weak key path - _cas/cascache.py: fetch_proto()/store_proto() for SA serialization - sandbox.py: accumulates subaction digests across sandbox.run() calls - _sandboxreapi.py: reads action_result.subactions after execution Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- src/buildstream/_artifactcache.py | 80 +++++++++ src/buildstream/_cas/cascache.py | 94 ++++++++++ src/buildstream/_context.py | 8 +- src/buildstream/_scheduler/__init__.py | 2 + .../queues/speculativeactiongenerationqueue.py | 105 +++++++++++ .../queues/speculativecacheprimingqueue.py | 195 +++++++++++++++++++++ src/buildstream/_stream.py | 13 ++ src/buildstream/element.py | 44 +++++ src/buildstream/sandbox/_sandboxreapi.py | 4 + src/buildstream/sandbox/sandbox.py | 24 +++ 10 files changed, 568 insertions(+), 1 deletion(-) diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index c8328f109..40b390aa2 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -473,3 +473,83 @@ class ArtifactCache(AssetCache): return bool(response) except AssetCacheError as e: raise ArtifactError("{}".format(e), temporary=True) from e + + # store_speculative_actions(): + # + # Store SpeculativeActions for an element's artifact. + # + # Stores using both the artifact proto field (backward compat) and + # a weak key reference (stable across dependency version changes). + # + # Args: + # artifact (Artifact): The artifact to attach speculative actions to + # spec_actions (SpeculativeActions): The speculative actions proto + # weak_key (str): Optional weak cache key for stable lookup + # + def store_speculative_actions(self, artifact, spec_actions, weak_key=None): + + # Store the speculative actions proto in CAS + spec_actions_digest = self.cas.store_proto(spec_actions) + + # Load the artifact proto + artifact_proto = artifact._get_proto() + + # Set the speculative_actions field (backward compat) + artifact_proto.speculative_actions.CopyFrom(spec_actions_digest) + + # Save the updated artifact proto + ref = artifact._element.get_artifact_name(artifact.get_extract_key()) + proto_path = os.path.join(self._basedir, ref) + with open(proto_path, mode="w+b") as f: + f.write(artifact_proto.SerializeToString()) + + # Store a weak key reference for stable lookup + if weak_key: + element = artifact._element + project = element._get_project() + sa_ref = "{}/{}/speculative-{}".format(project.name, element.name, weak_key) + sa_ref_path = os.path.join(self._basedir, sa_ref) + os.makedirs(os.path.dirname(sa_ref_path), exist_ok=True) + with open(sa_ref_path, mode="w+b") as f: + f.write(spec_actions.SerializeToString()) + + # get_speculative_actions(): + # + # Retrieve SpeculativeActions for an element's artifact. + # + # First tries the weak key path (stable across dependency version + # changes), then falls back to the artifact proto field. + # + # Args: + # artifact (Artifact): The artifact to get speculative actions from + # weak_key (str): Optional weak cache key for stable lookup + # + # Returns: + # SpeculativeActions proto or None if not available + # + def get_speculative_actions(self, artifact, weak_key=None): + from ._protos.buildstream.v2 import speculative_actions_pb2 + + # Try weak key lookup first (stable across dependency version changes) + if weak_key: + element = artifact._element + project = element._get_project() + sa_ref = "{}/{}/speculative-{}".format(project.name, element.name, weak_key) + sa_ref_path = os.path.join(self._basedir, sa_ref) + if os.path.exists(sa_ref_path): + spec_actions = speculative_actions_pb2.SpeculativeActions() + with open(sa_ref_path, mode="r+b") as f: + spec_actions.ParseFromString(f.read()) + return spec_actions + + # Fallback: load from artifact proto field + artifact_proto = artifact._get_proto() + if not artifact_proto: + return None + + # Check if speculative_actions field is set + if not artifact_proto.HasField("speculative_actions"): + return None + + # Fetch the speculative actions from CAS + return self.cas.fetch_proto(artifact_proto.speculative_actions, speculative_actions_pb2.SpeculativeActions) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 68fd4b610..92c640f28 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -703,6 +703,100 @@ class CASCache: def get_cache_usage(self): return self._cache_usage_monitor.get_cache_usage() + # fetch_proto(): + # + # Fetch a protobuf message from CAS by digest and parse it. + # + # Args: + # digest (Digest): The digest of the proto message + # proto_class: The protobuf message class to parse into + # + # Returns: + # The parsed protobuf message, or None if not found + # + def fetch_proto(self, digest, proto_class): + if not digest or not digest.hash: + return None + + try: + with self.open(digest, mode="rb") as f: + proto_instance = proto_class() + proto_instance.ParseFromString(f.read()) + return proto_instance + except FileNotFoundError: + return None + except Exception: + return None + + # store_proto(): + # + # Store a protobuf message in CAS. + # + # Args: + # proto: The protobuf message instance + # instance_name (str): Optional casd instance_name for remote CAS + # + # Returns: + # (Digest): The digest of the stored proto + # + def store_proto(self, proto, instance_name=None): + buffer = proto.SerializeToString() + return self.add_object(buffer=buffer, instance_name=instance_name) + + # fetch_action(): + # + # Fetch an Action proto from CAS. + # + # Args: + # action_digest (Digest): The digest of the Action + # + # Returns: + # Action proto or None if not found + # + def fetch_action(self, action_digest): + return self.fetch_proto(action_digest, remote_execution_pb2.Action) + + # store_action(): + # + # Store an Action proto in CAS. + # + # Args: + # action (Action): The Action proto + # instance_name (str): Optional casd instance_name + # + # Returns: + # (Digest): The digest of the stored action + # + def store_action(self, action, instance_name=None): + return self.store_proto(action, instance_name=instance_name) + + # fetch_directory(): + # + # Fetch a Directory proto from CAS (not the full tree). + # + # Args: + # directory_digest (Digest): The digest of the Directory + # + # Returns: + # Directory proto or None if not found + # + def fetch_directory_proto(self, directory_digest): + return self.fetch_proto(directory_digest, remote_execution_pb2.Directory) + + # store_directory(): + # + # Store a Directory proto in CAS. + # + # Args: + # directory (Directory): The Directory proto + # instance_name (str): Optional casd instance_name + # + # Returns: + # (Digest): The digest of the stored directory + # + def store_directory_proto(self, directory, instance_name=None): + return self.store_proto(directory, instance_name=instance_name) + # _CASCacheUsage # diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 48e8eff43..9cf89854f 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -164,6 +164,9 @@ class Context: # What to do when a build fails in non interactive mode self.sched_error_action: Optional[str] = None + # Whether speculative actions are enabled + self.speculative_actions: bool = False + # Maximum jobs per build self.build_max_jobs: Optional[int] = None @@ -451,13 +454,16 @@ class Context: # Load scheduler config scheduler = defaults.get_mapping("scheduler") - scheduler.validate_keys(["on-error", "fetchers", "builders", "pushers", "network-retries"]) + scheduler.validate_keys(["on-error", "fetchers", "builders", "pushers", "network-retries", "speculative-actions"]) self.sched_error_action = scheduler.get_enum("on-error", _SchedulerErrorAction) self.sched_fetchers = scheduler.get_int("fetchers") self.sched_builders = scheduler.get_int("builders") self.sched_pushers = scheduler.get_int("pushers") self.sched_network_retries = scheduler.get_int("network-retries") + # Load speculative actions config + self.speculative_actions = scheduler.get_bool("speculative-actions") + # Load build config build = defaults.get_mapping("build") build.validate_keys(["max-jobs", "retry-failed", "dependencies"]) diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d37e47b6f..0b849045f 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -23,6 +23,8 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue from .queues.cachequeryqueue import CacheQueryQueue +from .queues.speculativeactiongenerationqueue import SpeculativeActionGenerationQueue +from .queues.speculativecacheprimingqueue import SpeculativeCachePrimingQueue from .scheduler import Scheduler, SchedStatus from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py b/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py new file mode 100644 index 000000000..c7397c5cc --- /dev/null +++ b/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py @@ -0,0 +1,105 @@ +# +# Copyright 2025 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SpeculativeActionGenerationQueue +================================= + +Queue for generating SpeculativeActions after element builds. + +This queue runs after BuildQueue to: +1. Extract subaction digests from built elements +2. Generate SOURCE and ARTIFACT overlays +3. Store SpeculativeActions with the artifact +""" + +# Local imports +from . import Queue, QueueStatus +from ..jobs import JobStatus + + +# A queue which generates speculative actions for built elements +# +class SpeculativeActionGenerationQueue(Queue): + + action_name = "Generating overlays" + complete_name = "Overlays generated" + resources = [] # No special resources needed + + def get_process_func(self): + return SpeculativeActionGenerationQueue._generate_overlays + + def status(self, element): + # Only process elements that were successfully built + # and have subaction digests + if not element._cached_success(): + return QueueStatus.SKIP + + # Check if element has subaction digests + subaction_digests = element._get_subaction_digests() + if not subaction_digests: + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, _, element, result, status): + if status is JobStatus.FAIL: + # Generation is best-effort, don't fail the build + pass + + # Result contains the SpeculativeActions that were generated + # The artifact cache has already been updated in the child process + + @staticmethod + def _generate_overlays(element): + """ + Generate SpeculativeActions for an element. + + Args: + element: The element to generate overlays for + + Returns: + Number of actions generated, or None if skipped + """ + from ..._speculative_actions.generator import SpeculativeActionsGenerator + + # Get subaction digests + subaction_digests = element._get_subaction_digests() + if not subaction_digests: + return None + + # Get the context and caches + context = element._get_context() + cas = context.get_cascache() + artifactcache = context.artifactcache + + # Get dependencies to resolve overlays + from ...types import _Scope + + dependencies = list(element._dependencies(_Scope.BUILD, recurse=False)) + + # Generate overlays + generator = SpeculativeActionsGenerator(cas) + spec_actions = generator.generate_speculative_actions(element, subaction_digests, dependencies) + + if not spec_actions or not spec_actions.actions: + return 0 + + # Store with the artifact, using weak key for stable lookup + artifact = element._get_artifact() + weak_key = element._get_weak_cache_key() + artifactcache.store_speculative_actions(artifact, spec_actions, weak_key=weak_key) + + return len(spec_actions.actions) diff --git a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py new file mode 100644 index 000000000..1a0be9c15 --- /dev/null +++ b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py @@ -0,0 +1,195 @@ +# +# Copyright 2025 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SpeculativeCachePrimingQueue +============================= + +Queue for priming the remote ActionCache with speculative actions. + +This queue runs after PullQueue (in parallel with BuildQueue) to: +1. Retrieve SpeculativeActions from pulled artifacts +2. Instantiate actions by applying overlays +3. Submit to execution via buildbox-casd to prime the ActionCache + +This enables parallelism: while elements build normally, we're priming +the cache for other elements that will build later. +""" + +# Local imports +from . import Queue, QueueStatus +from ..jobs import JobStatus +from ..resources import ResourceType + + +# A queue which primes the ActionCache with speculative actions +# +class SpeculativeCachePrimingQueue(Queue): + + action_name = "Priming cache" + complete_name = "Cache primed" + resources = [ResourceType.UPLOAD] # Uses network to submit actions + + def get_process_func(self): + return SpeculativeCachePrimingQueue._prime_cache + + def status(self, element): + # Only process elements that were pulled (not built locally) + # and are cached with SpeculativeActions + if not element._cached(): + return QueueStatus.SKIP + + # Check if element has SpeculativeActions (try weak key first) + context = element._get_context() + artifactcache = context.artifactcache + artifact = element._get_artifact() + weak_key = element._get_weak_cache_key() + + spec_actions = artifactcache.get_speculative_actions(artifact, weak_key=weak_key) + if not spec_actions or not spec_actions.actions: + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, _, element, result, status): + if status is JobStatus.FAIL: + # Priming is best-effort, don't fail the build + return + + # Result contains number of actions submitted + if result: + primed_count, total_count = result + element.info(f"Primed {primed_count}/{total_count} actions") + + @staticmethod + def _prime_cache(element): + """ + Prime the ActionCache for an element. + + Retrieves stored SpeculativeActions, instantiates them with + current dependency digests, and submits each adapted action + to buildbox-casd's execution service. The execution produces + verified ActionResults that get cached, so subsequent builds + can hit the action cache instead of rebuilding. + + Args: + element: The element to prime cache for + + Returns: + Tuple of (primed_count, total_count) or None if skipped + """ + from ..._speculative_actions.instantiator import SpeculativeActionInstantiator + + # Get the context and caches + context = element._get_context() + cas = context.get_cascache() + artifactcache = context.artifactcache + + # Get SpeculativeActions (try weak key first) + artifact = element._get_artifact() + weak_key = element._get_weak_cache_key() + spec_actions = artifactcache.get_speculative_actions(artifact, weak_key=weak_key) + if not spec_actions or not spec_actions.actions: + return None + + # Build element lookup for dependency resolution + from ...types import _Scope + + dependencies = list(element._dependencies(_Scope.BUILD, recurse=True)) + element_lookup = {dep.name: dep for dep in dependencies} + element_lookup[element.name] = element # Include self + + # Instantiate and submit each action + instantiator = SpeculativeActionInstantiator(cas, artifactcache) + primed_count = 0 + total_count = len(spec_actions.actions) + + # Get the execution service from buildbox-casd + casd = context.get_casd() + exec_service = casd._exec_service + if not exec_service: + element.warn("No execution service available for speculative action priming") + return None + + for spec_action in spec_actions.actions: + try: + # Instantiate action by applying overlays + action_digest = instantiator.instantiate_action(spec_action, element, element_lookup) + + if not action_digest: + continue + + # Submit to buildbox-casd's execution service. + # casd runs the action via its local execution scheduler + # (buildbox-run), producing a verified ActionResult that + # gets stored in the action cache. + if SpeculativeCachePrimingQueue._submit_action( + exec_service, action_digest, element + ): + primed_count += 1 + + except Exception as e: + # Best-effort: log but continue with other actions + element.warn(f"Failed to prime action: {e}") + continue + + return (primed_count, total_count) + + @staticmethod + def _submit_action(exec_service, action_digest, element): + """ + Submit an action to buildbox-casd's execution service. + + This sends an Execute request to the local buildbox-casd, which + runs the action via its local execution scheduler (using + buildbox-run). The resulting ActionResult is stored in the + action cache, making it available for future builds. + + Args: + exec_service: The gRPC ExecutionStub for buildbox-casd + action_digest: The Action digest to execute + element: The element (for logging) + + Returns: + bool: True if submitted successfully + """ + try: + from ..._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + + request = remote_execution_pb2.ExecuteRequest( + action_digest=action_digest, + skip_cache_lookup=False, # Check ActionCache first + ) + + # Submit Execute request. The response is a stream of + # Operation messages. We consume the stream to ensure the + # action completes and the result is cached. + operation_stream = exec_service.Execute(request) + for operation in operation_stream: + if operation.done: + # Check if the operation completed successfully + if operation.HasField("error"): + element.warn( + f"Priming action failed: {operation.error.message}" + ) + return False + return True + + # Stream ended without a done operation + return False + + except Exception as e: + element.warn(f"Failed to submit priming action: {e}") + return False diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index a475bdb41..36d67b5ed 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -41,6 +41,8 @@ from ._scheduler import ( BuildQueue, PullQueue, ArtifactPushQueue, + SpeculativeActionGenerationQueue, + SpeculativeCachePrimingQueue, ) from .element import Element from ._profile import Topics, PROFILER @@ -429,8 +431,19 @@ class Stream: self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) + if self._context.speculative_actions: + # Priming queue: For each element, instantiate and submit its speculative + # actions to warm the remote ActionCache BEFORE the element reaches BuildQueue. + # Must come after FetchQueue so sources are available for resolving SOURCE overlays. + self._add_queue(SpeculativeCachePrimingQueue(self._scheduler)) + self._add_queue(BuildQueue(self._scheduler, imperative=True)) + if self._context.speculative_actions: + # Generation queue: After each build, extract subactions and generate + # overlays so future builds can benefit from cache priming. + self._add_queue(SpeculativeActionGenerationQueue(self._scheduler)) + if self._artifacts.has_push_remotes(): self._add_queue(ArtifactPushQueue(self._scheduler, skip_uncached=True)) diff --git a/src/buildstream/element.py b/src/buildstream/element.py index aede8f1ea..8a5377ac4 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -307,6 +307,9 @@ class Element(Plugin): self.__variables: Optional[Variables] = None self.__dynamic_public_guard = Lock() + # Speculative actions support + self.__subaction_digests = [] # Subaction digests from the build's ActionResult + if artifact_key: self.__initialize_from_artifact_key(artifact_key) else: @@ -1725,6 +1728,9 @@ class Element(Plugin): collect = self.assemble(sandbox) # pylint: disable=assignment-from-no-return self.__set_build_result(success=True, description="succeeded") + + # Collect subaction digests recorded during the build + self._set_subaction_digests(sandbox._get_subaction_digests()) except (ElementError, SandboxCommandError) as e: # Shelling into a sandbox is useful to debug this error e.sandbox = True @@ -1803,6 +1809,43 @@ class Element(Plugin): "unable to collect artifact contents".format(collect) ) + # _set_subaction_digests(): + # + # Set the subaction digests captured from the build's ActionResult. + # This is called after a successful build to store compiler invocations. + # + # Args: + # subaction_digests: List of Digest protos from ActionResult.subactions + # + def _set_subaction_digests(self, subaction_digests): + self.__subaction_digests = list(subaction_digests) if subaction_digests else [] + + # _get_subaction_digests(): + # + # Get the subaction digests from the build's ActionResult. + # + # Returns: + # List of Digest protos, or empty list if none + # + def _get_subaction_digests(self): + return self.__subaction_digests + + # _get_weak_cache_key(): + # + # Get the weak cache key for this element. + # + # Used by speculative actions for stable lookup: the weak key includes + # everything about the element itself (sources, env, commands, sandbox) + # but only dependency *names* (not their cache keys), making it stable + # across dependency version changes while still changing when the + # element's own sources or configuration change. + # + # Returns: + # (str): The weak cache key, or None if not yet computed + # + def _get_weak_cache_key(self): + return self.__weak_cache_key + # _fetch_done() # # Indicates that fetching the sources for this element has been done. @@ -3338,6 +3381,7 @@ class Element(Plugin): ] self.__weak_cache_key = self._calculate_cache_key(dependencies) + context = self._get_context() # Calculate the strict cache key diff --git a/src/buildstream/sandbox/_sandboxreapi.py b/src/buildstream/sandbox/_sandboxreapi.py index be210e450..0360301cd 100644 --- a/src/buildstream/sandbox/_sandboxreapi.py +++ b/src/buildstream/sandbox/_sandboxreapi.py @@ -105,6 +105,10 @@ class SandboxREAPI(Sandbox): cwd, action_result.output_directories, action_result.output_files, failure=action_result.exit_code != 0 ) + # Collect subaction digests recorded during nested execution (if any) + if action_result.subactions: + self._collect_subaction_digests(action_result.subactions) + # Non-zero exit code means a normal error during the build: # the remote execution system has worked correctly but the command failed. return action_result.exit_code diff --git a/src/buildstream/sandbox/sandbox.py b/src/buildstream/sandbox/sandbox.py index e266a95c3..75e69e23b 100644 --- a/src/buildstream/sandbox/sandbox.py +++ b/src/buildstream/sandbox/sandbox.py @@ -86,6 +86,7 @@ class Sandbox: self.__mount_sources = {} # type: Dict[str, str] self.__allow_run = True self.__subsandboxes = [] # type: List[Sandbox] + self.__subaction_digests = [] # Subaction digests collected from ActionResults # Plugin element full name for logging plugin = kwargs.get("plugin", None) @@ -562,6 +563,29 @@ class Sandbox: def _get_element_name(self): return self.__element_name + # _collect_subaction_digests() + # + # Store subaction digests from an ActionResult. + # + # Called by sandbox implementations after executing an action + # to collect subaction digests recorded by trexe. + # + # Args: + # subactions: Iterable of Digest protos from ActionResult.subactions + # + def _collect_subaction_digests(self, subactions): + self.__subaction_digests.extend(subactions) + + # _get_subaction_digests() + # + # Get subaction digests collected during sandbox execution. + # + # Returns: + # (list): List of Digest protos + # + def _get_subaction_digests(self): + return self.__subaction_digests + # _disable_run() # # Raise exception if `Sandbox.run()` is called.
