This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch juerg/re-proxy in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d836828588368b19a89ba517c8864cd6a2c2c268 Author: Jürg Billeter <[email protected]> AuthorDate: Sat Jun 22 11:57:33 2024 +0200 _sandboxremote.py: Use buildbox-casd as remote execution proxy buildbox-casd is already used as CAS and RA proxy. Using buildbox-casd for all remote connections aims to improve robustness (e.g., consistent retry behavior) and will allow adding support for token-based authentication. --- src/buildstream/sandbox/_sandboxremote.py | 142 ++++++++++++++++++++++++------ 1 file changed, 113 insertions(+), 29 deletions(-) diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 1e8598d6c..ef8ef7de8 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -15,19 +15,84 @@ # Jim MacArthur <[email protected]> import shutil -from functools import partial import grpc from ._sandboxreapi import SandboxREAPI from .. import _signals +from .._remote import BaseRemote from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2 from .._protos.google.rpc import code_pb2 from .._exceptions import BstError, SandboxError from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc from .._cas import CASRemote +class ExecutionRemote(BaseRemote): + def __init__(self, spec, casd): + super().__init__(spec) + self.casd = casd + self.instance_name = None + self.exec_service = None + self.operations_service = None + + def close(self): + self.exec_service = None + self.operations_service = None + super().close() + + def _configure_protocols(self): + local_cas = self.casd.get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemotesRequest() + self.spec.to_localcas_remote(request.execution) + try: + response = local_cas.GetInstanceNameForRemotes(request) + self.instance_name = response.instance_name + self.exec_service = self.casd.get_exec_service() + self.operations_service = self.casd.get_operations_service() + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # buildbox-casd is too old to support execution service remotes. + # Fall back to direct connection. + self.instance_name = self.spec.instance_name + self.channel = self.spec.open_channel() + self.exec_service = remote_execution_pb2_grpc.ExecutionStub(self.channel) + self.operations_service = operations_pb2_grpc.OperationsStub(self.channel) + else: + raise + + +class ActionCacheRemote(BaseRemote): + def __init__(self, spec, casd): + super().__init__(spec) + self.casd = casd + self.instance_name = None + self.ac_service = None + + def close(self): + self.ac_service = None + super().close() + + def _configure_protocols(self): + local_cas = self.casd.get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemotesRequest() + self.spec.to_localcas_remote(request.action_cache) + try: + response = local_cas.GetInstanceNameForRemotes(request) + self.instance_name = response.instance_name + self.ac_service = self.casd.get_ac_service() + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # buildbox-casd is too old to support action cache remotes. + # Fall back to direct connection. + self.instance_name = self.spec.instance_name + self.channel = self.spec.open_channel() + self.ac_service = remote_execution_pb2_grpc.ActionCacheStub(self.channel) + else: + raise + + # SandboxRemote() # # This isn't really a sandbox, it's a stub which sends all the sources and build @@ -39,6 +104,7 @@ class SandboxRemote(SandboxREAPI): context = self._get_context() cascache = context.get_cascache() + casd = context.get_casd() specs = context.remote_execution_specs if specs is None: @@ -62,15 +128,33 @@ class SandboxRemote(SandboxREAPI): self.own_storage_remote = False self.storage_remote = cascache.get_default_remote() - def run_remote_command(self, channel, action_digest): + self.exec_remote = ExecutionRemote(self.exec_spec, casd) + try: + self.exec_remote.init() + except grpc.RpcError as e: + raise SandboxError( + "Failed to contact remote execution service at {}: {}".format(self.exec_spec.url, e) + ) from e + + if self.action_spec: + self.ac_remote = ActionCacheRemote(self.action_spec, casd) + try: + self.ac_remote.init() + except grpc.RpcError as e: + raise SandboxError( + "Failed to contact action cache service at {}: {}".format(self.action_spec.url, e) + ) from e + else: + self.ac_remote = None + + def run_remote_command(self, action_digest): # Sends an execution request to the remote execution server. # # This function blocks until it gets a response from the server. - # Try to create a communication channel to the BuildGrid server. - stub = remote_execution_pb2_grpc.ExecutionStub(channel) + stub = self.exec_remote.exec_service request = remote_execution_pb2.ExecuteRequest( - instance_name=self.exec_spec.instance_name, action_digest=action_digest, skip_cache_lookup=False + instance_name=self.exec_remote.instance_name, action_digest=action_digest, skip_cache_lookup=False ) def __run_remote_command(stub, execute_request=None, running_operation=None): @@ -117,7 +201,7 @@ class SandboxRemote(SandboxREAPI): operation = None with self._get_context().messenger.timed_activity( "Waiting for the remote build to complete", element_name=self._get_element_name() - ), _signals.terminator(partial(self.cancel_operation, channel)): + ), _signals.terminator(self.cancel_operation): operation = __run_remote_command(stub, execute_request=request) if operation is None: return None @@ -128,12 +212,12 @@ class SandboxRemote(SandboxREAPI): return operation - def cancel_operation(self, channel): + def cancel_operation(self): # If we don't have the name can't send request. if self.operation_name is None: return - stub = operations_pb2_grpc.OperationsStub(channel) + stub = self.exec_remote.operations_service request = operations_pb2.CancelOperationRequest(name=str(self.operation_name)) try: @@ -205,10 +289,8 @@ class SandboxRemote(SandboxREAPI): raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e # Now request to execute the action - channel = self.exec_spec.open_channel() - with channel: - operation = self.run_remote_command(channel, action_digest) - action_result = self._extract_action_result(operation) + operation = self.run_remote_command(action_digest) + action_result = self._extract_action_result(operation) # Fetch outputs for output_directory in action_result.output_directories: @@ -243,25 +325,23 @@ class SandboxRemote(SandboxREAPI): # # Should return either the action response or None if not found, raise # Sandboxerror if other grpc error was raised - if not self.action_spec: + if not self.ac_remote: return None - channel = self.action_spec.open_channel() - with channel: - request = remote_execution_pb2.GetActionResultRequest( - instance_name=self.action_spec.instance_name, action_digest=action_digest - ) - stub = remote_execution_pb2_grpc.ActionCacheStub(channel) - try: - result = stub.GetActionResult(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details())) - return None - else: - context = self._get_context() - context.messenger.info("Action result found in action cache", element_name=self._get_element_name()) - return result + request = remote_execution_pb2.GetActionResultRequest( + instance_name=self.ac_remote.instance_name, action_digest=action_digest + ) + stub = self.ac_remote.ac_service + try: + result = stub.GetActionResult(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details())) + return None + else: + context = self._get_context() + context.messenger.info("Action result found in action cache", element_name=self._get_element_name()) + return result @staticmethod def _extract_action_result(operation): @@ -288,5 +368,9 @@ class SandboxRemote(SandboxREAPI): return execution_response.result def _cleanup(self): + if self.ac_remote: + self.ac_remote.close() + if self.exec_remote: + self.exec_remote.close() if self.own_storage_remote: self.storage_remote.close()
