This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch jbilleter/action-cache in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c1d3e8836891089d2122b9d5c9f234a22d362c75 Author: Jürg Billeter <[email protected]> AuthorDate: Fri Jun 27 18:17:45 2025 +0200 sandbox: Create single buildbox-casd instance for remote execution --- src/buildstream/sandbox/_reremote.py | 52 ++++++++++++++ src/buildstream/sandbox/_sandboxremote.py | 110 ++++++------------------------ 2 files changed, 72 insertions(+), 90 deletions(-) diff --git a/src/buildstream/sandbox/_reremote.py b/src/buildstream/sandbox/_reremote.py new file mode 100644 index 000000000..4c7773fec --- /dev/null +++ b/src/buildstream/sandbox/_reremote.py @@ -0,0 +1,52 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .._protos.build.buildgrid import local_cas_pb2 + +from .._cas import CASRemote + + +class RERemote(CASRemote): + def __init__(self, cas_spec, remote_execution_specs, cascache): + super().__init__(cas_spec, cascache) + + self.remote_execution_specs = remote_execution_specs + self.exec_service = None + self.operations_service = None + self.ac_service = None + + def close(self): + self.exec_service = None + self.operations_service = None + self.ac_service = None + super().close() + + def _configure_protocols(self): + local_cas = self.cascache.get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemotesRequest() + if self.remote_execution_specs.storage_spec: + self.remote_execution_specs.storage_spec.to_localcas_remote(request.content_addressable_storage) + else: + self.spec.to_localcas_remote(request.content_addressable_storage) + if self.remote_execution_specs.exec_spec: + self.remote_execution_specs.exec_spec.to_localcas_remote(request.execution) + if self.remote_execution_specs.action_spec: + self.remote_execution_specs.action_spec.to_localcas_remote(request.action_cache) + response = local_cas.GetInstanceNameForRemotes(request) + self.local_cas_instance_name = response.instance_name + + casd = self.cascache.get_casd() + self.exec_service = casd.get_exec_service() + self.operations_service = casd.get_operations_service() + self.ac_service = casd.get_ac_service() diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index bb185b72b..dbd967a95 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -18,58 +18,13 @@ import shutil import grpc +from ._reremote import RERemote from ._sandboxreapi import SandboxREAPI from .. import _signals -from .._remote import BaseRemote from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 -from .._protos.build.buildgrid import local_cas_pb2 from .._protos.google.rpc import code_pb2 from .._exceptions import BstError, SandboxError from .._protos.google.longrunning import operations_pb2 -from .._cas import CASRemote - - -class ExecutionRemote(BaseRemote): - def __init__(self, spec, casd): - super().__init__(spec) - self.casd = casd - self.instance_name = None - self.exec_service = None - self.operations_service = None - - def close(self): - self.exec_service = None - self.operations_service = None - super().close() - - def _configure_protocols(self): - local_cas = self.casd.get_local_cas() - request = local_cas_pb2.GetInstanceNameForRemotesRequest() - self.spec.to_localcas_remote(request.execution) - response = local_cas.GetInstanceNameForRemotes(request) - self.instance_name = response.instance_name - self.exec_service = self.casd.get_exec_service() - self.operations_service = self.casd.get_operations_service() - - -class ActionCacheRemote(BaseRemote): - def __init__(self, spec, casd): - super().__init__(spec) - self.casd = casd - self.instance_name = None - self.ac_service = None - - def close(self): - self.ac_service = None - super().close() - - def _configure_protocols(self): - local_cas = self.casd.get_local_cas() - request = local_cas_pb2.GetInstanceNameForRemotesRequest() - self.spec.to_localcas_remote(request.action_cache) - response = local_cas.GetInstanceNameForRemotes(request) - self.instance_name = response.instance_name - self.ac_service = self.casd.get_ac_service() # SandboxRemote() @@ -83,7 +38,6 @@ class SandboxRemote(SandboxREAPI): context = self._get_context() cascache = context.get_cascache() - casd = context.get_casd() specs = context.remote_execution_specs if specs is None: @@ -94,46 +48,26 @@ class SandboxRemote(SandboxREAPI): self.action_spec = specs.action_spec self.operation_name = None - if self.storage_spec: - self.own_storage_remote = True - self.storage_remote = CASRemote(self.storage_spec, cascache) - try: - self.storage_remote.init() - except grpc.RpcError as e: - raise SandboxError( - "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e) - ) from e - else: - self.own_storage_remote = False - self.storage_remote = cascache.get_default_remote() - - self.exec_remote = ExecutionRemote(self.exec_spec, casd) + self.re_remote = RERemote(context.remote_cache_spec, specs, cascache) try: - self.exec_remote.init() + self.re_remote.init() except grpc.RpcError as e: - raise SandboxError( - "Failed to contact remote execution service at {}: {}".format(self.exec_spec.url, e) - ) from e - - if self.action_spec: - self.ac_remote = ActionCacheRemote(self.action_spec, casd) - try: - self.ac_remote.init() - except grpc.RpcError as e: - raise SandboxError( - "Failed to contact action cache service at {}: {}".format(self.action_spec.url, e) - ) from e - else: - self.ac_remote = None + urls = set() + if self.storage_spec: + urls.add(self.storage_spec.url) + urls.add(self.exec_spec.url) + if self.action_spec: + urls.add(self.action_spec.url) + raise SandboxError("Failed to contact remote execution endpoint at {}: {}".format(sorted(urls), e)) from e def run_remote_command(self, action_digest): # Sends an execution request to the remote execution server. # # This function blocks until it gets a response from the server. - stub = self.exec_remote.exec_service + stub = self.re_remote.exec_service request = remote_execution_pb2.ExecuteRequest( - instance_name=self.exec_remote.instance_name, action_digest=action_digest, skip_cache_lookup=False + instance_name=self.re_remote.local_cas_instance_name, action_digest=action_digest, skip_cache_lookup=False ) def __run_remote_command(stub, execute_request=None, running_operation=None): @@ -196,7 +130,7 @@ class SandboxRemote(SandboxREAPI): if self.operation_name is None: return - stub = self.exec_remote.operations_service + stub = self.re_remote.operations_service request = operations_pb2.CancelOperationRequest(name=str(self.operation_name)) try: @@ -220,7 +154,7 @@ class SandboxRemote(SandboxREAPI): local_missing_blobs = cascache.missing_blobs(required_blobs) if local_missing_blobs: - cascache.fetch_blobs(self.storage_remote, local_missing_blobs) + cascache.fetch_blobs(self.re_remote, local_missing_blobs) def _execute_action(self, action, flags): stdout, stderr = self._get_output() @@ -232,7 +166,7 @@ class SandboxRemote(SandboxREAPI): action_digest = cascache.add_object(buffer=action.SerializeToString()) - casremote = self.storage_remote + casremote = self.re_remote # check action cache download and download if there action_result = self._check_action_cache(action_digest) @@ -312,13 +246,13 @@ class SandboxRemote(SandboxREAPI): # # Should return either the action response or None if not found, raise # Sandboxerror if other grpc error was raised - if not self.ac_remote: + if not self.action_spec: return None request = remote_execution_pb2.GetActionResultRequest( - instance_name=self.ac_remote.instance_name, action_digest=action_digest + instance_name=self.re_remote.local_cas_instance_name, action_digest=action_digest ) - stub = self.ac_remote.ac_service + stub = self.re_remote.ac_service try: result = stub.GetActionResult(request) except grpc.RpcError as e: @@ -355,9 +289,5 @@ class SandboxRemote(SandboxREAPI): return execution_response.result def _cleanup(self): - if self.ac_remote: - self.ac_remote.close() - if self.exec_remote: - self.exec_remote.close() - if self.own_storage_remote: - self.storage_remote.close() + if self.re_remote: + self.re_remote.close()
