This is an automated email from the ASF dual-hosted git repository.

striker pushed a commit to branch striker/speculative-actions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a233e66d2757c31e549faaaeeb255846efbaaaef
Author: Sander Striker <[email protected]>
AuthorDate: Mon Mar 16 18:28:12 2026 +0100

    speculative actions: Add scheduler queues and pipeline wiring
    
    Scheduler queues:
    - SpeculativeActionGenerationQueue: runs after BuildQueue, extracts
      subaction digests, generates overlays, stores SA by weak key
    - SpeculativeCachePrimingQueue: runs after PullQueue, retrieves stored
      SA, instantiates with current dep digests, submits Execute to
      buildbox-casd's local execution scheduler for verified caching
    
    Pipeline wiring:
    - _stream.py: conditionally adds queues when speculative-actions enabled
    - _context.py: reads speculative-actions scheduler config flag
    - element.py: _get_weak_cache_key(), subaction digest storage,
      _assemble() transfers digests from sandbox after build
    - _artifactcache.py: store/get_speculative_actions() with weak key path
    - _cas/cascache.py: fetch_proto()/store_proto() for SA serialization
    - sandbox.py: accumulates subaction digests across sandbox.run() calls
    - _sandboxreapi.py: reads action_result.subactions after execution
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 src/buildstream/_artifactcache.py                  |  80 +++++++++
 src/buildstream/_cas/cascache.py                   |  94 ++++++++++
 src/buildstream/_context.py                        |   8 +-
 src/buildstream/_scheduler/__init__.py             |   2 +
 .../queues/speculativeactiongenerationqueue.py     | 105 +++++++++++
 .../queues/speculativecacheprimingqueue.py         | 195 +++++++++++++++++++++
 src/buildstream/_stream.py                         |  13 ++
 src/buildstream/element.py                         |  44 +++++
 src/buildstream/sandbox/_sandboxreapi.py           |   4 +
 src/buildstream/sandbox/sandbox.py                 |  24 +++
 10 files changed, 568 insertions(+), 1 deletion(-)

diff --git a/src/buildstream/_artifactcache.py 
b/src/buildstream/_artifactcache.py
index c8328f109..40b390aa2 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -473,3 +473,83 @@ class ArtifactCache(AssetCache):
             return bool(response)
         except AssetCacheError as e:
             raise ArtifactError("{}".format(e), temporary=True) from e
+
+    # store_speculative_actions():
+    #
+    # Store SpeculativeActions for an element's artifact.
+    #
+    # Stores using both the artifact proto field (backward compat) and
+    # a weak key reference (stable across dependency version changes).
+    #
+    # Args:
+    #     artifact (Artifact): The artifact to attach speculative actions to
+    #     spec_actions (SpeculativeActions): The speculative actions proto
+    #     weak_key (str): Optional weak cache key for stable lookup
+    #
+    def store_speculative_actions(self, artifact, spec_actions, weak_key=None):
+
+        # Store the speculative actions proto in CAS
+        spec_actions_digest = self.cas.store_proto(spec_actions)
+
+        # Load the artifact proto
+        artifact_proto = artifact._get_proto()
+
+        # Set the speculative_actions field (backward compat)
+        artifact_proto.speculative_actions.CopyFrom(spec_actions_digest)
+
+        # Save the updated artifact proto
+        ref = artifact._element.get_artifact_name(artifact.get_extract_key())
+        proto_path = os.path.join(self._basedir, ref)
+        with open(proto_path, mode="w+b") as f:
+            f.write(artifact_proto.SerializeToString())
+
+        # Store a weak key reference for stable lookup
+        if weak_key:
+            element = artifact._element
+            project = element._get_project()
+            sa_ref = "{}/{}/speculative-{}".format(project.name, element.name, 
weak_key)
+            sa_ref_path = os.path.join(self._basedir, sa_ref)
+            os.makedirs(os.path.dirname(sa_ref_path), exist_ok=True)
+            with open(sa_ref_path, mode="w+b") as f:
+                f.write(spec_actions.SerializeToString())
+
+    # get_speculative_actions():
+    #
+    # Retrieve SpeculativeActions for an element's artifact.
+    #
+    # First tries the weak key path (stable across dependency version
+    # changes), then falls back to the artifact proto field.
+    #
+    # Args:
+    #     artifact (Artifact): The artifact to get speculative actions from
+    #     weak_key (str): Optional weak cache key for stable lookup
+    #
+    # Returns:
+    #     SpeculativeActions proto or None if not available
+    #
+    def get_speculative_actions(self, artifact, weak_key=None):
+        from ._protos.buildstream.v2 import speculative_actions_pb2
+
+        # Try weak key lookup first (stable across dependency version changes)
+        if weak_key:
+            element = artifact._element
+            project = element._get_project()
+            sa_ref = "{}/{}/speculative-{}".format(project.name, element.name, 
weak_key)
+            sa_ref_path = os.path.join(self._basedir, sa_ref)
+            if os.path.exists(sa_ref_path):
+                spec_actions = speculative_actions_pb2.SpeculativeActions()
+                with open(sa_ref_path, mode="r+b") as f:
+                    spec_actions.ParseFromString(f.read())
+                return spec_actions
+
+        # Fallback: load from artifact proto field
+        artifact_proto = artifact._get_proto()
+        if not artifact_proto:
+            return None
+
+        # Check if speculative_actions field is set
+        if not artifact_proto.HasField("speculative_actions"):
+            return None
+
+        # Fetch the speculative actions from CAS
+        return self.cas.fetch_proto(artifact_proto.speculative_actions, 
speculative_actions_pb2.SpeculativeActions)
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 68fd4b610..92c640f28 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -703,6 +703,100 @@ class CASCache:
     def get_cache_usage(self):
         return self._cache_usage_monitor.get_cache_usage()
 
+    # fetch_proto():
+    #
+    # Fetch a protobuf message from CAS by digest and parse it.
+    #
+    # Args:
+    #     digest (Digest): The digest of the proto message
+    #     proto_class: The protobuf message class to parse into
+    #
+    # Returns:
+    #     The parsed protobuf message, or None if not found
+    #
+    def fetch_proto(self, digest, proto_class):
+        if not digest or not digest.hash:
+            return None
+
+        try:
+            with self.open(digest, mode="rb") as f:
+                proto_instance = proto_class()
+                proto_instance.ParseFromString(f.read())
+                return proto_instance
+        except FileNotFoundError:
+            return None
+        except Exception:
+            return None
+
+    # store_proto():
+    #
+    # Store a protobuf message in CAS.
+    #
+    # Args:
+    #     proto: The protobuf message instance
+    #     instance_name (str): Optional casd instance_name for remote CAS
+    #
+    # Returns:
+    #     (Digest): The digest of the stored proto
+    #
+    def store_proto(self, proto, instance_name=None):
+        buffer = proto.SerializeToString()
+        return self.add_object(buffer=buffer, instance_name=instance_name)
+
+    # fetch_action():
+    #
+    # Fetch an Action proto from CAS.
+    #
+    # Args:
+    #     action_digest (Digest): The digest of the Action
+    #
+    # Returns:
+    #     Action proto or None if not found
+    #
+    def fetch_action(self, action_digest):
+        return self.fetch_proto(action_digest, remote_execution_pb2.Action)
+
+    # store_action():
+    #
+    # Store an Action proto in CAS.
+    #
+    # Args:
+    #     action (Action): The Action proto
+    #     instance_name (str): Optional casd instance_name
+    #
+    # Returns:
+    #     (Digest): The digest of the stored action
+    #
+    def store_action(self, action, instance_name=None):
+        return self.store_proto(action, instance_name=instance_name)
+
+    # fetch_directory():
+    #
+    # Fetch a Directory proto from CAS (not the full tree).
+    #
+    # Args:
+    #     directory_digest (Digest): The digest of the Directory
+    #
+    # Returns:
+    #     Directory proto or None if not found
+    #
+    def fetch_directory_proto(self, directory_digest):
+        return self.fetch_proto(directory_digest, 
remote_execution_pb2.Directory)
+
+    # store_directory():
+    #
+    # Store a Directory proto in CAS.
+    #
+    # Args:
+    #     directory (Directory): The Directory proto
+    #     instance_name (str): Optional casd instance_name
+    #
+    # Returns:
+    #     (Digest): The digest of the stored directory
+    #
+    def store_directory_proto(self, directory, instance_name=None):
+        return self.store_proto(directory, instance_name=instance_name)
+
 
 # _CASCacheUsage
 #
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 48e8eff43..9cf89854f 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -164,6 +164,9 @@ class Context:
         # What to do when a build fails in non interactive mode
         self.sched_error_action: Optional[str] = None
 
+        # Whether speculative actions are enabled
+        self.speculative_actions: bool = False
+
         # Maximum jobs per build
         self.build_max_jobs: Optional[int] = None
 
@@ -451,13 +454,16 @@ class Context:
 
         # Load scheduler config
         scheduler = defaults.get_mapping("scheduler")
-        scheduler.validate_keys(["on-error", "fetchers", "builders", 
"pushers", "network-retries"])
+        scheduler.validate_keys(["on-error", "fetchers", "builders", 
"pushers", "network-retries", "speculative-actions"])
         self.sched_error_action = scheduler.get_enum("on-error", 
_SchedulerErrorAction)
         self.sched_fetchers = scheduler.get_int("fetchers")
         self.sched_builders = scheduler.get_int("builders")
         self.sched_pushers = scheduler.get_int("pushers")
         self.sched_network_retries = scheduler.get_int("network-retries")
 
+        # Load speculative actions config
+        self.speculative_actions = scheduler.get_bool("speculative-actions")
+
         # Load build config
         build = defaults.get_mapping("build")
         build.validate_keys(["max-jobs", "retry-failed", "dependencies"])
diff --git a/src/buildstream/_scheduler/__init__.py 
b/src/buildstream/_scheduler/__init__.py
index d37e47b6f..0b849045f 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -23,6 +23,8 @@ from .queues.buildqueue import BuildQueue
 from .queues.artifactpushqueue import ArtifactPushQueue
 from .queues.pullqueue import PullQueue
 from .queues.cachequeryqueue import CacheQueryQueue
+from .queues.speculativeactiongenerationqueue import 
SpeculativeActionGenerationQueue
+from .queues.speculativecacheprimingqueue import SpeculativeCachePrimingQueue
 
 from .scheduler import Scheduler, SchedStatus
 from .jobs import ElementJob, JobStatus
diff --git 
a/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py 
b/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py
new file mode 100644
index 000000000..c7397c5cc
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py
@@ -0,0 +1,105 @@
+#
+#  Copyright 2025 The Apache Software Foundation
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""
+SpeculativeActionGenerationQueue
+=================================
+
+Queue for generating SpeculativeActions after element builds.
+
+This queue runs after BuildQueue to:
+1. Extract subaction digests from built elements
+2. Generate SOURCE and ARTIFACT overlays
+3. Store SpeculativeActions with the artifact
+"""
+
+# Local imports
+from . import Queue, QueueStatus
+from ..jobs import JobStatus
+
+
+# A queue which generates speculative actions for built elements
+#
+class SpeculativeActionGenerationQueue(Queue):
+
+    action_name = "Generating overlays"
+    complete_name = "Overlays generated"
+    resources = []  # No special resources needed
+
+    def get_process_func(self):
+        return SpeculativeActionGenerationQueue._generate_overlays
+
+    def status(self, element):
+        # Only process elements that were successfully built
+        # and have subaction digests
+        if not element._cached_success():
+            return QueueStatus.SKIP
+
+        # Check if element has subaction digests
+        subaction_digests = element._get_subaction_digests()
+        if not subaction_digests:
+            return QueueStatus.SKIP
+
+        return QueueStatus.READY
+
+    def done(self, _, element, result, status):
+        if status is JobStatus.FAIL:
+            # Generation is best-effort, don't fail the build
+            pass
+
+        # Result contains the SpeculativeActions that were generated
+        # The artifact cache has already been updated in the child process
+
+    @staticmethod
+    def _generate_overlays(element):
+        """
+        Generate SpeculativeActions for an element.
+
+        Args:
+            element: The element to generate overlays for
+
+        Returns:
+            Number of actions generated, or None if skipped
+        """
+        from ..._speculative_actions.generator import 
SpeculativeActionsGenerator
+
+        # Get subaction digests
+        subaction_digests = element._get_subaction_digests()
+        if not subaction_digests:
+            return None
+
+        # Get the context and caches
+        context = element._get_context()
+        cas = context.get_cascache()
+        artifactcache = context.artifactcache
+
+        # Get dependencies to resolve overlays
+        from ...types import _Scope
+
+        dependencies = list(element._dependencies(_Scope.BUILD, recurse=False))
+
+        # Generate overlays
+        generator = SpeculativeActionsGenerator(cas)
+        spec_actions = generator.generate_speculative_actions(element, 
subaction_digests, dependencies)
+
+        if not spec_actions or not spec_actions.actions:
+            return 0
+
+        # Store with the artifact, using weak key for stable lookup
+        artifact = element._get_artifact()
+        weak_key = element._get_weak_cache_key()
+        artifactcache.store_speculative_actions(artifact, spec_actions, 
weak_key=weak_key)
+
+        return len(spec_actions.actions)
diff --git a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py 
b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
new file mode 100644
index 000000000..1a0be9c15
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
@@ -0,0 +1,195 @@
+#
+#  Copyright 2025 The Apache Software Foundation
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""
+SpeculativeCachePrimingQueue
+=============================
+
+Queue for priming the remote ActionCache with speculative actions.
+
+This queue runs after PullQueue (in parallel with BuildQueue) to:
+1. Retrieve SpeculativeActions from pulled artifacts
+2. Instantiate actions by applying overlays
+3. Submit to execution via buildbox-casd to prime the ActionCache
+
+This enables parallelism: while elements build normally, we're priming
+the cache for other elements that will build later.
+"""
+
+# Local imports
+from . import Queue, QueueStatus
+from ..jobs import JobStatus
+from ..resources import ResourceType
+
+
+# A queue which primes the ActionCache with speculative actions
+#
+class SpeculativeCachePrimingQueue(Queue):
+
+    action_name = "Priming cache"
+    complete_name = "Cache primed"
+    resources = [ResourceType.UPLOAD]  # Uses network to submit actions
+
+    def get_process_func(self):
+        return SpeculativeCachePrimingQueue._prime_cache
+
+    def status(self, element):
+        # Only process elements that were pulled (not built locally)
+        # and are cached with SpeculativeActions
+        if not element._cached():
+            return QueueStatus.SKIP
+
+        # Check if element has SpeculativeActions (try weak key first)
+        context = element._get_context()
+        artifactcache = context.artifactcache
+        artifact = element._get_artifact()
+        weak_key = element._get_weak_cache_key()
+
+        spec_actions = artifactcache.get_speculative_actions(artifact, 
weak_key=weak_key)
+        if not spec_actions or not spec_actions.actions:
+            return QueueStatus.SKIP
+
+        return QueueStatus.READY
+
+    def done(self, _, element, result, status):
+        if status is JobStatus.FAIL:
+            # Priming is best-effort, don't fail the build
+            return
+
+        # Result contains number of actions submitted
+        if result:
+            primed_count, total_count = result
+            element.info(f"Primed {primed_count}/{total_count} actions")
+
+    @staticmethod
+    def _prime_cache(element):
+        """
+        Prime the ActionCache for an element.
+
+        Retrieves stored SpeculativeActions, instantiates them with
+        current dependency digests, and submits each adapted action
+        to buildbox-casd's execution service. The execution produces
+        verified ActionResults that get cached, so subsequent builds
+        can hit the action cache instead of rebuilding.
+
+        Args:
+            element: The element to prime cache for
+
+        Returns:
+            Tuple of (primed_count, total_count) or None if skipped
+        """
+        from ..._speculative_actions.instantiator import 
SpeculativeActionInstantiator
+
+        # Get the context and caches
+        context = element._get_context()
+        cas = context.get_cascache()
+        artifactcache = context.artifactcache
+
+        # Get SpeculativeActions (try weak key first)
+        artifact = element._get_artifact()
+        weak_key = element._get_weak_cache_key()
+        spec_actions = artifactcache.get_speculative_actions(artifact, 
weak_key=weak_key)
+        if not spec_actions or not spec_actions.actions:
+            return None
+
+        # Build element lookup for dependency resolution
+        from ...types import _Scope
+
+        dependencies = list(element._dependencies(_Scope.BUILD, recurse=True))
+        element_lookup = {dep.name: dep for dep in dependencies}
+        element_lookup[element.name] = element  # Include self
+
+        # Instantiate and submit each action
+        instantiator = SpeculativeActionInstantiator(cas, artifactcache)
+        primed_count = 0
+        total_count = len(spec_actions.actions)
+
+        # Get the execution service from buildbox-casd
+        casd = context.get_casd()
+        exec_service = casd._exec_service
+        if not exec_service:
+            element.warn("No execution service available for speculative 
action priming")
+            return None
+
+        for spec_action in spec_actions.actions:
+            try:
+                # Instantiate action by applying overlays
+                action_digest = instantiator.instantiate_action(spec_action, 
element, element_lookup)
+
+                if not action_digest:
+                    continue
+
+                # Submit to buildbox-casd's execution service.
+                # casd runs the action via its local execution scheduler
+                # (buildbox-run), producing a verified ActionResult that
+                # gets stored in the action cache.
+                if SpeculativeCachePrimingQueue._submit_action(
+                    exec_service, action_digest, element
+                ):
+                    primed_count += 1
+
+            except Exception as e:
+                # Best-effort: log but continue with other actions
+                element.warn(f"Failed to prime action: {e}")
+                continue
+
+        return (primed_count, total_count)
+
+    @staticmethod
+    def _submit_action(exec_service, action_digest, element):
+        """
+        Submit an action to buildbox-casd's execution service.
+
+        This sends an Execute request to the local buildbox-casd, which
+        runs the action via its local execution scheduler (using
+        buildbox-run). The resulting ActionResult is stored in the
+        action cache, making it available for future builds.
+
+        Args:
+            exec_service: The gRPC ExecutionStub for buildbox-casd
+            action_digest: The Action digest to execute
+            element: The element (for logging)
+
+        Returns:
+            bool: True if submitted successfully
+        """
+        try:
+            from ..._protos.build.bazel.remote.execution.v2 import 
remote_execution_pb2
+
+            request = remote_execution_pb2.ExecuteRequest(
+                action_digest=action_digest,
+                skip_cache_lookup=False,  # Check ActionCache first
+            )
+
+            # Submit Execute request. The response is a stream of
+            # Operation messages. We consume the stream to ensure the
+            # action completes and the result is cached.
+            operation_stream = exec_service.Execute(request)
+            for operation in operation_stream:
+                if operation.done:
+                    # Check if the operation completed successfully
+                    if operation.HasField("error"):
+                        element.warn(
+                            f"Priming action failed: {operation.error.message}"
+                        )
+                        return False
+                    return True
+
+            # Stream ended without a done operation
+            return False
+
+        except Exception as e:
+            element.warn(f"Failed to submit priming action: {e}")
+            return False
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a475bdb41..36d67b5ed 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -41,6 +41,8 @@ from ._scheduler import (
     BuildQueue,
     PullQueue,
     ArtifactPushQueue,
+    SpeculativeActionGenerationQueue,
+    SpeculativeCachePrimingQueue,
 )
 from .element import Element
 from ._profile import Topics, PROFILER
@@ -429,8 +431,19 @@ class Stream:
 
         self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
 
+        if self._context.speculative_actions:
+            # Priming queue: For each element, instantiate and submit its 
speculative
+            # actions to warm the remote ActionCache BEFORE the element 
reaches BuildQueue.
+            # Must come after FetchQueue so sources are available for 
resolving SOURCE overlays.
+            self._add_queue(SpeculativeCachePrimingQueue(self._scheduler))
+
         self._add_queue(BuildQueue(self._scheduler, imperative=True))
 
+        if self._context.speculative_actions:
+            # Generation queue: After each build, extract subactions and 
generate
+            # overlays so future builds can benefit from cache priming.
+            self._add_queue(SpeculativeActionGenerationQueue(self._scheduler))
+
         if self._artifacts.has_push_remotes():
             self._add_queue(ArtifactPushQueue(self._scheduler, 
skip_uncached=True))
 
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index aede8f1ea..8a5377ac4 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -307,6 +307,9 @@ class Element(Plugin):
         self.__variables: Optional[Variables] = None
         self.__dynamic_public_guard = Lock()
 
+        # Speculative actions support
+        self.__subaction_digests = []  # Subaction digests from the build's 
ActionResult
+
         if artifact_key:
             self.__initialize_from_artifact_key(artifact_key)
         else:
@@ -1725,6 +1728,9 @@ class Element(Plugin):
                     collect = self.assemble(sandbox)  # pylint: 
disable=assignment-from-no-return
 
                     self.__set_build_result(success=True, 
description="succeeded")
+
+                    # Collect subaction digests recorded during the build
+                    
self._set_subaction_digests(sandbox._get_subaction_digests())
                 except (ElementError, SandboxCommandError) as e:
                     # Shelling into a sandbox is useful to debug this error
                     e.sandbox = True
@@ -1803,6 +1809,43 @@ class Element(Plugin):
                 "unable to collect artifact contents".format(collect)
             )
 
+    # _set_subaction_digests():
+    #
+    # Set the subaction digests captured from the build's ActionResult.
+    # This is called after a successful build to store compiler invocations.
+    #
+    # Args:
+    #     subaction_digests: List of Digest protos from ActionResult.subactions
+    #
+    def _set_subaction_digests(self, subaction_digests):
+        self.__subaction_digests = list(subaction_digests) if 
subaction_digests else []
+
+    # _get_subaction_digests():
+    #
+    # Get the subaction digests from the build's ActionResult.
+    #
+    # Returns:
+    #     List of Digest protos, or empty list if none
+    #
+    def _get_subaction_digests(self):
+        return self.__subaction_digests
+
+    # _get_weak_cache_key():
+    #
+    # Get the weak cache key for this element.
+    #
+    # Used by speculative actions for stable lookup: the weak key includes
+    # everything about the element itself (sources, env, commands, sandbox)
+    # but only dependency *names* (not their cache keys), making it stable
+    # across dependency version changes while still changing when the
+    # element's own sources or configuration change.
+    #
+    # Returns:
+    #     (str): The weak cache key, or None if not yet computed
+    #
+    def _get_weak_cache_key(self):
+        return self.__weak_cache_key
+
     # _fetch_done()
     #
     # Indicates that fetching the sources for this element has been done.
@@ -3338,6 +3381,7 @@ class Element(Plugin):
             ]
             self.__weak_cache_key = self._calculate_cache_key(dependencies)
 
+
         context = self._get_context()
 
         # Calculate the strict cache key
diff --git a/src/buildstream/sandbox/_sandboxreapi.py 
b/src/buildstream/sandbox/_sandboxreapi.py
index be210e450..0360301cd 100644
--- a/src/buildstream/sandbox/_sandboxreapi.py
+++ b/src/buildstream/sandbox/_sandboxreapi.py
@@ -105,6 +105,10 @@ class SandboxREAPI(Sandbox):
             cwd, action_result.output_directories, action_result.output_files, 
failure=action_result.exit_code != 0
         )
 
+        # Collect subaction digests recorded during nested execution (if any)
+        if action_result.subactions:
+            self._collect_subaction_digests(action_result.subactions)
+
         # Non-zero exit code means a normal error during the build:
         # the remote execution system has worked correctly but the command 
failed.
         return action_result.exit_code
diff --git a/src/buildstream/sandbox/sandbox.py 
b/src/buildstream/sandbox/sandbox.py
index e266a95c3..75e69e23b 100644
--- a/src/buildstream/sandbox/sandbox.py
+++ b/src/buildstream/sandbox/sandbox.py
@@ -86,6 +86,7 @@ class Sandbox:
         self.__mount_sources = {}  # type: Dict[str, str]
         self.__allow_run = True
         self.__subsandboxes = []  # type: List[Sandbox]
+        self.__subaction_digests = []  # Subaction digests collected from 
ActionResults
 
         # Plugin element full name for logging
         plugin = kwargs.get("plugin", None)
@@ -562,6 +563,29 @@ class Sandbox:
     def _get_element_name(self):
         return self.__element_name
 
+    # _collect_subaction_digests()
+    #
+    # Store subaction digests from an ActionResult.
+    #
+    # Called by sandbox implementations after executing an action
+    # to collect subaction digests recorded by trexe.
+    #
+    # Args:
+    #    subactions: Iterable of Digest protos from ActionResult.subactions
+    #
+    def _collect_subaction_digests(self, subactions):
+        self.__subaction_digests.extend(subactions)
+
+    # _get_subaction_digests()
+    #
+    # Get subaction digests collected during sandbox execution.
+    #
+    # Returns:
+    #    (list): List of Digest protos
+    #
+    def _get_subaction_digests(self):
+        return self.__subaction_digests
+
     # _disable_run()
     #
     # Raise exception if `Sandbox.run()` is called.

Reply via email to