KevinGG commented on a change in pull request #16601:
URL: https://github.com/apache/beam/pull/16601#discussion_r792924698



##########
File path: sdks/python/apache_beam/runners/interactive/background_caching_job.py
##########
@@ -232,12 +232,12 @@ def has_source_to_cache(user_pipeline):
 
       file_based_cm = ie.current_env().get_cache_manager(user_pipeline)
       cache_dir = file_based_cm._cache_dir
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = ib.options.cache_root
+      if ie.current_env().options.cache_root:

Review comment:
       This seems to be verbose. How about
   
   ```
   cache_root = ie.current_env().options.cache_root
   if cache_root:
     ...
     if cache_root.startswith('gs://'):
       raise ...
     cache_dir = cache_root
   ```
   
   Also nit: use single quotes whenever you can to conform to the existing code 
style.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     manager for the pipeline."""
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
     if not cache_manager and create_if_absent:
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+      cache_root = self.options.cache_root
+      if cache_root:
+        if cache_root.startswith("gs://"):
+          cache_root_path = PurePath(cache_root)
+          bucket_name = cache_root_path.parts[1]
+          self.check_bucket_exists(bucket_name)
+          cache_dir = "gs://"+"/".join(cache_root_path.parts[1:]) \
+                    + "/" + str(id(pipeline))

Review comment:
       You can format the string by:
   
   ```
   # See how the function call span over multiple lines, please use yapf to 
format it though.
   'gs://{}/{}'.format(
           '/'.join(cache_root_path.parts[1:]),
           id(pipeline))
   ```
   
   to avoid the line break and increase readability.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -236,6 +236,9 @@ def cache_root(self, value):
 
     Example of local directory usage::
       interactive_beam.options.cache_root = "/Users/username/my/cache/dir"
+
+    Example of GCS directory usage::

Review comment:
       nit: let's use single quotes here too.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
             pipeline)
       chain.user_pipeline = pipeline
     return chain
+
+  def check_bucket_exists(self, bucket_name):
+    try:
+      from apitools.base.py.exceptions import HttpError
+    except:
+      raise ImportError('Could not import HttpError from apitools.')

Review comment:
       +1, if moving to the top, please follow the example: 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L57
   
   Or, see if the ImportError can be merged into the `unable to verify whether 
bucket {} exists` warning branch because you might not want the import error to 
fail the whole module.
   
   I also think this function could be moved to the utils.py module: 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/utils.py
   
   Please add a docstring and typehints to this function so that we know what 
it does.
   Things I'm not sure are:
   
   - if the function is named `check_...`, shouldn't it return True/False like 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L461?
 But how will you differentiate "not exist" from "not able to check" by a bool 
value?
   - if you rename the function to `assert_...` so that the function only 
asserts whether a bucket exists or not without a returning value, you may also 
need to add a `raise`:
   
   ```
   except HttpError as e:
         if e.status_code == 404:
           _LOGGER.error('%s bucket does not exist!', bucket_name)
           raise
         else:
   ```
   
   Otherwise, the error branch won't take effect.
   
   Please also add a unit test to this function for different scenarios.
   

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
             pipeline)
       chain.user_pipeline = pipeline
     return chain
+
+  def check_bucket_exists(self, bucket_name):
+    try:
+      from apitools.base.py.exceptions import HttpError
+    except:
+      raise ImportError('Could not import HttpError from apitools.')
+
+    try:
+      storage_client = storage.StorageV1(
+          credentials=auth.get_service_credentials(),
+          get_credentials=False,
+          http=get_new_http(),
+          response_encoding='utf8')
+      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+      storage_client.buckets.Get(request)
+    except HttpError as e:
+      if e.status_code == 404:
+        _LOGGER.error("{} bucket does not exist!".format(bucket_name))
+      else:
+        _LOGGER.warning(
+            "HttpError - unable to verify whether bucket {} exists.\
+        ".format(bucket_name))

Review comment:
       Please do not pre-format messages in logging statements. This affects 
the performance as the formatting can be ignored by the logger configuration 
while pre-formatting is always executed and may waste resources.
   
   Instead, pass them as arguments:
   
   ```
   _LOGGER.level('template %s message', arg1, arg2, arg3, ...)
   ```

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     manager for the pipeline."""
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
     if not cache_manager and create_if_absent:
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+      cache_root = self.options.cache_root
+      if cache_root:
+        if cache_root.startswith("gs://"):
+          cache_root_path = PurePath(cache_root)
+          bucket_name = cache_root_path.parts[1]

Review comment:
       Let's also do a len(cache_root_path.parts) < 2 check and raise an error 
when the GCS path is invalid (too short) instead of raising an index out of 
range exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to