This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch jbilleter/action-cache
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 202ed5046dc68be25aa39592849e4f6e61bf5340
Author: Jürg Billeter <[email protected]>
AuthorDate: Fri Jun 27 18:17:45 2025 +0200

    sandbox: Create single buildbox-casd instance for remote execution
---
 src/buildstream/sandbox/_reremote.py      |  52 ++++++++++++++
 src/buildstream/sandbox/_sandboxremote.py | 110 ++++++------------------------
 2 files changed, 72 insertions(+), 90 deletions(-)

diff --git a/src/buildstream/sandbox/_reremote.py 
b/src/buildstream/sandbox/_reremote.py
new file mode 100644
index 000000000..d977e6696
--- /dev/null
+++ b/src/buildstream/sandbox/_reremote.py
@@ -0,0 +1,52 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from .._protos.build.buildgrid import local_cas_pb2
+
+from .._cas import CASRemote
+
+
+class RERemote(CASRemote):
+    def __init__(self, cas_spec, remote_execution_specs, cascache):
+        super().__init__(cas_spec, cascache)
+
+        self.remote_execution_specs = remote_execution_specs
+        self.exec_service = None
+        self.operations_service = None
+        self.ac_service = None
+
+    def close(self):
+        self.exec_service = None
+        self.operations_service = None
+        self.ac_service = None
+        super().close()
+
+    def _configure_protocols(self):
+        local_cas = self.cascache.get_local_cas()
+        request = local_cas_pb2.GetInstanceNameForRemotesRequest()
+        if self.remote_execution_specs.storage_spec:
+            
self.remote_execution_specs.storage_spec.to_localcas_remote(request.cas)
+        else:
+            self.spec.to_localcas_remote(request.cas)
+        if self.remote_execution_specs.exec_spec:
+            
self.remote_execution_specs.exec_spec.to_localcas_remote(request.execution)
+        if self.remote_execution_specs.action_spec:
+            
self.remote_execution_specs.action_spec.to_localcas_remote(request.action_cache)
+        response = local_cas.GetInstanceNameForRemotes(request)
+        self.local_cas_instance_name = response.instance_name
+
+        casd = self.cascache.get_casd()
+        self.exec_service = casd.get_exec_service()
+        self.operations_service = casd.get_operations_service()
+        self.ac_service = casd.get_ac_service()
diff --git a/src/buildstream/sandbox/_sandboxremote.py 
b/src/buildstream/sandbox/_sandboxremote.py
index bb185b72b..dbd967a95 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -18,58 +18,13 @@ import shutil
 
 import grpc
 
+from ._reremote import RERemote
 from ._sandboxreapi import SandboxREAPI
 from .. import _signals
-from .._remote import BaseRemote
 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
-from .._protos.build.buildgrid import local_cas_pb2
 from .._protos.google.rpc import code_pb2
 from .._exceptions import BstError, SandboxError
 from .._protos.google.longrunning import operations_pb2
-from .._cas import CASRemote
-
-
-class ExecutionRemote(BaseRemote):
-    def __init__(self, spec, casd):
-        super().__init__(spec)
-        self.casd = casd
-        self.instance_name = None
-        self.exec_service = None
-        self.operations_service = None
-
-    def close(self):
-        self.exec_service = None
-        self.operations_service = None
-        super().close()
-
-    def _configure_protocols(self):
-        local_cas = self.casd.get_local_cas()
-        request = local_cas_pb2.GetInstanceNameForRemotesRequest()
-        self.spec.to_localcas_remote(request.execution)
-        response = local_cas.GetInstanceNameForRemotes(request)
-        self.instance_name = response.instance_name
-        self.exec_service = self.casd.get_exec_service()
-        self.operations_service = self.casd.get_operations_service()
-
-
-class ActionCacheRemote(BaseRemote):
-    def __init__(self, spec, casd):
-        super().__init__(spec)
-        self.casd = casd
-        self.instance_name = None
-        self.ac_service = None
-
-    def close(self):
-        self.ac_service = None
-        super().close()
-
-    def _configure_protocols(self):
-        local_cas = self.casd.get_local_cas()
-        request = local_cas_pb2.GetInstanceNameForRemotesRequest()
-        self.spec.to_localcas_remote(request.action_cache)
-        response = local_cas.GetInstanceNameForRemotes(request)
-        self.instance_name = response.instance_name
-        self.ac_service = self.casd.get_ac_service()
 
 
 # SandboxRemote()
@@ -83,7 +38,6 @@ class SandboxRemote(SandboxREAPI):
 
         context = self._get_context()
         cascache = context.get_cascache()
-        casd = context.get_casd()
 
         specs = context.remote_execution_specs
         if specs is None:
@@ -94,46 +48,26 @@ class SandboxRemote(SandboxREAPI):
         self.action_spec = specs.action_spec
         self.operation_name = None
 
-        if self.storage_spec:
-            self.own_storage_remote = True
-            self.storage_remote = CASRemote(self.storage_spec, cascache)
-            try:
-                self.storage_remote.init()
-            except grpc.RpcError as e:
-                raise SandboxError(
-                    "Failed to contact remote execution CAS endpoint at {}: 
{}".format(self.storage_spec.url, e)
-                ) from e
-        else:
-            self.own_storage_remote = False
-            self.storage_remote = cascache.get_default_remote()
-
-        self.exec_remote = ExecutionRemote(self.exec_spec, casd)
+        self.re_remote = RERemote(context.remote_cache_spec, specs, cascache)
         try:
-            self.exec_remote.init()
+            self.re_remote.init()
         except grpc.RpcError as e:
-            raise SandboxError(
-                "Failed to contact remote execution service at {}: 
{}".format(self.exec_spec.url, e)
-            ) from e
-
-        if self.action_spec:
-            self.ac_remote = ActionCacheRemote(self.action_spec, casd)
-            try:
-                self.ac_remote.init()
-            except grpc.RpcError as e:
-                raise SandboxError(
-                    "Failed to contact action cache service at {}: 
{}".format(self.action_spec.url, e)
-                ) from e
-        else:
-            self.ac_remote = None
+            urls = set()
+            if self.storage_spec:
+                urls.add(self.storage_spec.url)
+            urls.add(self.exec_spec.url)
+            if self.action_spec:
+                urls.add(self.action_spec.url)
+            raise SandboxError("Failed to contact remote execution endpoint at 
{}: {}".format(sorted(urls), e)) from e
 
     def run_remote_command(self, action_digest):
         # Sends an execution request to the remote execution server.
         #
         # This function blocks until it gets a response from the server.
 
-        stub = self.exec_remote.exec_service
+        stub = self.re_remote.exec_service
         request = remote_execution_pb2.ExecuteRequest(
-            instance_name=self.exec_remote.instance_name, 
action_digest=action_digest, skip_cache_lookup=False
+            instance_name=self.re_remote.local_cas_instance_name, 
action_digest=action_digest, skip_cache_lookup=False
         )
 
         def __run_remote_command(stub, execute_request=None, 
running_operation=None):
@@ -196,7 +130,7 @@ class SandboxRemote(SandboxREAPI):
         if self.operation_name is None:
             return
 
-        stub = self.exec_remote.operations_service
+        stub = self.re_remote.operations_service
         request = 
operations_pb2.CancelOperationRequest(name=str(self.operation_name))
 
         try:
@@ -220,7 +154,7 @@ class SandboxRemote(SandboxREAPI):
 
             local_missing_blobs = cascache.missing_blobs(required_blobs)
             if local_missing_blobs:
-                cascache.fetch_blobs(self.storage_remote, local_missing_blobs)
+                cascache.fetch_blobs(self.re_remote, local_missing_blobs)
 
     def _execute_action(self, action, flags):
         stdout, stderr = self._get_output()
@@ -232,7 +166,7 @@ class SandboxRemote(SandboxREAPI):
 
         action_digest = cascache.add_object(buffer=action.SerializeToString())
 
-        casremote = self.storage_remote
+        casremote = self.re_remote
 
         # check action cache download and download if there
         action_result = self._check_action_cache(action_digest)
@@ -312,13 +246,13 @@ class SandboxRemote(SandboxREAPI):
         #
         # Should return either the action response or None if not found, raise
         # Sandboxerror if other grpc error was raised
-        if not self.ac_remote:
+        if not self.action_spec:
             return None
 
         request = remote_execution_pb2.GetActionResultRequest(
-            instance_name=self.ac_remote.instance_name, 
action_digest=action_digest
+            instance_name=self.re_remote.local_cas_instance_name, 
action_digest=action_digest
         )
-        stub = self.ac_remote.ac_service
+        stub = self.re_remote.ac_service
         try:
             result = stub.GetActionResult(request)
         except grpc.RpcError as e:
@@ -355,9 +289,5 @@ class SandboxRemote(SandboxREAPI):
         return execution_response.result
 
     def _cleanup(self):
-        if self.ac_remote:
-            self.ac_remote.close()
-        if self.exec_remote:
-            self.exec_remote.close()
-        if self.own_storage_remote:
-            self.storage_remote.close()
+        if self.re_remote:
+            self.re_remote.close()

Reply via email to