This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch juerg/buildbox-asset-remote
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c1defd0dbe8affb00951bf752a19372358802b73
Author: Jürg Billeter <[email protected]>
AuthorDate: Fri May 31 14:07:52 2024 +0200

    _assetcache.py: Use buildbox-casd as remote asset proxy, if supported
    
    buildbox-casd is already used as CAS proxy. Using buildbox-casd for all
    remote connections aims to improve robustness (e.g., consistent retry
    behavior) and will allow adding support for token-based authentication.
---
 src/buildstream/_assetcache.py | 60 ++++++++++++++++++++++++++++--------------
 src/buildstream/_remote.py     |  3 ---
 2 files changed, 40 insertions(+), 23 deletions(-)

diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py
index 6a4806341..6f9dcba7c 100644
--- a/src/buildstream/_assetcache.py
+++ b/src/buildstream/_assetcache.py
@@ -20,17 +20,20 @@ from typing import List, Dict, Tuple, Iterable, Optional
 import grpc
 
 from . import utils
-from ._cas import CASRemote, CASCache
+from ._cas import CASRemote, CASCache, CASDProcessManager
 from ._exceptions import AssetCacheError, RemoteError
 from ._remotespec import RemoteSpec, RemoteType
 from ._remote import BaseRemote
 from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, 
remote_asset_pb2_grpc
+from ._protos.build.buildgrid import local_cas_pb2
 from ._protos.google.rpc import code_pb2
 
 
 class AssetRemote(BaseRemote):
-    def __init__(self, spec):
+    def __init__(self, spec, casd):
         super().__init__(spec)
+        self.casd = casd
+        self.instance_name = None
         self.fetch_service = None
         self.push_service = None
 
@@ -40,9 +43,24 @@ class AssetRemote(BaseRemote):
         super().close()
 
     def _configure_protocols(self):
-        # set up remote asset stubs
-        self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel)
-        self.push_service = remote_asset_pb2_grpc.PushStub(self.channel)
+        local_cas = self.casd.get_local_cas()
+        request = local_cas_pb2.GetInstanceNameForRemotesRequest()
+        request.remote_asset.CopyFrom(self.spec.to_localcas_remote())
+        try:
+            response = local_cas.GetInstanceNameForRemotes(request)
+            self.instance_name = response.instance_name
+            self.fetch_service = self.casd.get_asset_fetch()
+            self.push_service = self.casd.get_asset_push()
+        except grpc.RpcError as e:
+            if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == 
grpc.StatusCode.INVALID_ARGUMENT:
+                # buildbox-casd is too old to support asset-only remotes.
+                # Fall back to direct connection.
+                self.instance_name = self.spec.instance_name
+                self.channel = self.spec.open_channel()
+                self.fetch_service = 
remote_asset_pb2_grpc.FetchStub(self.channel)
+                self.push_service = 
remote_asset_pb2_grpc.PushStub(self.channel)
+            else:
+                raise
 
     # _check():
     #
@@ -55,8 +73,8 @@ class AssetRemote(BaseRemote):
     #
     def _check(self):
         request = remote_asset_pb2.FetchBlobRequest()
-        if self.spec.instance_name:
-            request.instance_name = self.spec.instance_name
+        if self.instance_name:
+            request.instance_name = self.instance_name
 
         try:
             self.fetch_service.FetchBlob(request)
@@ -74,8 +92,8 @@ class AssetRemote(BaseRemote):
 
         if self.spec.push:
             request = remote_asset_pb2.PushBlobRequest()
-            if self.spec.instance_name:
-                request.instance_name = self.spec.instance_name
+            if self.instance_name:
+                request.instance_name = self.instance_name
 
             try:
                 self.push_service.PushBlob(request)
@@ -112,8 +130,8 @@ class AssetRemote(BaseRemote):
     #
     def fetch_blob(self, uris, *, qualifiers=None):
         request = remote_asset_pb2.FetchBlobRequest()
-        if self.spec.instance_name:
-            request.instance_name = self.spec.instance_name
+        if self.instance_name:
+            request.instance_name = self.instance_name
         request.uris.extend(uris)
         if qualifiers:
             request.qualifiers.extend(qualifiers)
@@ -153,8 +171,8 @@ class AssetRemote(BaseRemote):
     #
     def fetch_directory(self, uris, *, qualifiers=None):
         request = remote_asset_pb2.FetchDirectoryRequest()
-        if self.spec.instance_name:
-            request.instance_name = self.spec.instance_name
+        if self.instance_name:
+            request.instance_name = self.instance_name
         request.uris.extend(uris)
         if qualifiers:
             request.qualifiers.extend(qualifiers)
@@ -194,8 +212,8 @@ class AssetRemote(BaseRemote):
     #
     def push_blob(self, uris, blob_digest, *, qualifiers=None, 
references_blobs=None, references_directories=None):
         request = remote_asset_pb2.PushBlobRequest()
-        if self.spec.instance_name:
-            request.instance_name = self.spec.instance_name
+        if self.instance_name:
+            request.instance_name = self.instance_name
         request.uris.extend(uris)
         request.blob_digest.CopyFrom(blob_digest)
         if qualifiers:
@@ -231,8 +249,8 @@ class AssetRemote(BaseRemote):
         self, uris, directory_digest, *, qualifiers=None, 
references_blobs=None, references_directories=None
     ):
         request = remote_asset_pb2.PushDirectoryRequest()
-        if self.spec.instance_name:
-            request.instance_name = self.spec.instance_name
+        if self.instance_name:
+            request.instance_name = self.instance_name
         request.uris.extend(uris)
         request.root_directory_digest.CopyFrom(directory_digest)
         if qualifiers:
@@ -262,14 +280,14 @@ class AssetRemote(BaseRemote):
 # to establish a connection to this remote at initialization time.
 #
 class RemotePair:
-    def __init__(self, cas: CASCache, spec: RemoteSpec):
+    def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: 
RemoteSpec):
         self.index: Optional[AssetRemote] = None
         self.storage: Optional[CASRemote] = None
         self.error: Optional[str] = None
 
         try:
             if spec.remote_type in [RemoteType.INDEX, RemoteType.ALL]:
-                index = AssetRemote(spec)
+                index = AssetRemote(spec, casd)
                 index.check()
                 self.index = index
             if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]:
@@ -324,6 +342,8 @@ class AssetCache:
         # Hold on to the project specs
         self._project_specs = project_specs
 
+        casd = self.context.get_casd()
+
         for spec in specs:
             # This can be called multiple times, ensure that we only try
             # to instantiate each remote once.
@@ -331,7 +351,7 @@ class AssetCache:
             if spec in self._remotes:
                 continue
 
-            remote = RemotePair(self.cas, spec)
+            remote = RemotePair(casd, self.cas, spec)
             if remote.error:
                 self.context.messenger.warn("Failed to initialize remote {}: 
{}".format(spec.url, remote.error))
 
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index 453f7f2a0..32584de09 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -68,9 +68,6 @@ class BaseRemote:
             if self._initialized:
                 return
 
-            if self.spec:
-                self.channel = self.spec.open_channel()
-
             self._configure_protocols()
             self._initialized = True
 

Reply via email to