KevinGG commented on a change in pull request #16601:
URL: https://github.com/apache/beam/pull/16601#discussion_r792924698
##########
File path: sdks/python/apache_beam/runners/interactive/background_caching_job.py
##########
@@ -232,12 +232,12 @@ def has_source_to_cache(user_pipeline):
file_based_cm = ie.current_env().get_cache_manager(user_pipeline)
cache_dir = file_based_cm._cache_dir
- from apache_beam.runners.interactive import interactive_beam as ib
- if ib.options.cache_root:
- #TODO(victorhc): Handle the case when the path starts with "gs://"
- if ib.options.cache_root.startswith("gs://"):
- raise ValueError("GCS paths are not currently supported.")
- cache_dir = ib.options.cache_root
+ if ie.current_env().options.cache_root:
Review comment:
This seems to be verbose. How about
```
cache_root = ie.current_env().options.cache_root
if cache_root:
...
if cache_root.startswith('gs://'):
raise ...
cache_dir = cache_root
```
Also nit: use single quotes whenever you can to conform to the existing code
style.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline,
create_if_absent=False):
manager for the pipeline."""
cache_manager = self._cache_managers.get(str(id(pipeline)), None)
if not cache_manager and create_if_absent:
- from apache_beam.runners.interactive import interactive_beam as ib
- if ib.options.cache_root:
- #TODO(victorhc): Handle the case when the path starts with "gs://"
- if ib.options.cache_root.startswith("gs://"):
- raise ValueError("GCS paths are not currently supported.")
- cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+ cache_root = self.options.cache_root
+ if cache_root:
+ if cache_root.startswith("gs://"):
+ cache_root_path = PurePath(cache_root)
+ bucket_name = cache_root_path.parts[1]
+ self.check_bucket_exists(bucket_name)
+ cache_dir = "gs://"+"/".join(cache_root_path.parts[1:]) \
+ + "/" + str(id(pipeline))
Review comment:
You can format the string by:
```
# See how the function call span over multiple lines, please use yapf to
format it though.
'gs://{}/{}'.format(
'/'.join(cache_root_path.parts[1:]),
id(pipeline))
```
to avoid the line break and increase readability.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -236,6 +236,9 @@ def cache_root(self, value):
Example of local directory usage::
interactive_beam.options.cache_root = "/Users/username/my/cache/dir"
+
+ Example of GCS directory usage::
Review comment:
nit: let's use single quotes here too.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
pipeline)
chain.user_pipeline = pipeline
return chain
+
+ def check_bucket_exists(self, bucket_name):
+ try:
+ from apitools.base.py.exceptions import HttpError
+ except:
+ raise ImportError('Could not import HttpError from apitools.')
Review comment:
+1, if moving to the top, please follow the example:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L57
Or, see if the ImportError can be merged into the `unable to verify whether
bucket {} exists` warning branch because you might not want the import error to
fail the whole module.
I also think this function could be moved to the utils.py module:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/utils.py
Please add a docstring and typehints to this function so that we know what
it does.
Things I'm not sure are:
- if the function is named `check_...`, shouldn't it return True/False like
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L461?
But how will you differentiate "not exist" from "not able to check" by a bool
value?
- if you rename the function to `assert_...` so that the function only
asserts whether a bucket exists or not without a returning value, you may also
need to add a `raise`:
```
except HttpError as e:
if e.status_code == 404:
_LOGGER.error('%s bucket does not exist!', bucket_name)
raise
else:
```
Otherwise, the error branch won't take effect.
Please also add a unit test to this function for different scenarios.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
pipeline)
chain.user_pipeline = pipeline
return chain
+
+ def check_bucket_exists(self, bucket_name):
+ try:
+ from apitools.base.py.exceptions import HttpError
+ except:
+ raise ImportError('Could not import HttpError from apitools.')
+
+ try:
+ storage_client = storage.StorageV1(
+ credentials=auth.get_service_credentials(),
+ get_credentials=False,
+ http=get_new_http(),
+ response_encoding='utf8')
+ request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+ storage_client.buckets.Get(request)
+ except HttpError as e:
+ if e.status_code == 404:
+ _LOGGER.error("{} bucket does not exist!".format(bucket_name))
+ else:
+ _LOGGER.warning(
+ "HttpError - unable to verify whether bucket {} exists.\
+ ".format(bucket_name))
Review comment:
Please do not pre-format messages in logging statements. This affects
the performance as the formatting can be ignored by the logger configuration
while pre-formatting is always executed and may waste resources.
Instead, pass them as arguments:
```
_LOGGER.level('template %s message', arg1, arg2, arg3, ...)
```
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline,
create_if_absent=False):
manager for the pipeline."""
cache_manager = self._cache_managers.get(str(id(pipeline)), None)
if not cache_manager and create_if_absent:
- from apache_beam.runners.interactive import interactive_beam as ib
- if ib.options.cache_root:
- #TODO(victorhc): Handle the case when the path starts with "gs://"
- if ib.options.cache_root.startswith("gs://"):
- raise ValueError("GCS paths are not currently supported.")
- cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+ cache_root = self.options.cache_root
+ if cache_root:
+ if cache_root.startswith("gs://"):
+ cache_root_path = PurePath(cache_root)
+ bucket_name = cache_root_path.parts[1]
Review comment:
Let's also do a len(cache_root_path.parts) < 2 check and raise an error
when the GCS path is invalid (too short) instead of raising an index out of
range exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]