[ 
https://issues.apache.org/jira/browse/BEAM-13734?focusedWorklogId=717375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-717375
 ]

ASF GitHub Bot logged work on BEAM-13734:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jan/22 23:42
            Start Date: 28/Jan/22 23:42
    Worklog Time Spent: 10m 
      Work Description: KevinGG commented on a change in pull request #16601:
URL: https://github.com/apache/beam/pull/16601#discussion_r794949408



##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -357,19 +363,47 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     given pipeline. If the pipeline is absent from the environment while
     create_if_absent is True, creates and returns a new file based cache
     manager for the pipeline."""
+    if self._is_in_ipython:
+      warnings.filterwarnings(
+          'ignore',
+          'options is deprecated since First stable release. References to '
+          '<pipeline>.options will not be supported',
+          category=DeprecationWarning)
+
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if isinstance(pipeline, Pipeline):
+      if hasattr(pipeline.runner, '_underlying_runner'):

Review comment:
       can this be replaced with isinstance(pipeline.runner, InteractiveRunner)?

##########
File path: sdks/python/apache_beam/runners/interactive/utils.py
##########
@@ -427,3 +436,30 @@ def create_var_in_main(name: str,
     from apache_beam.runners.interactive import interactive_environment as ie
     ie.current_env().watch({name: value})
   return name, value
+
+
+def assert_bucket_exists(bucket_name):
+  # type: (str) -> None
+
+  """Asserts whether the specified GCS bucket with the name
+  bucket_name exists.
+
+    Logs an error and raises a ValueError if the bucket does not exist.
+
+    Logs a warning if the bucket cannot be verified to exist.
+  """
+  try:

Review comment:
       You can make it something like:
   ```
   try:
     from apitools.base.py.exceptions import HttpError
     storage_client = ...
     ...
     storage_client.buckets.Get(request)
   except HttpError as e:
     if ...
     ...
     else:
         _LOGGER.warning(
             'HttpError - unable to verify whether bucket %s exists', 
bucket_name)
   except ImportError as e:
        _LOGGER.warning(
             'ImportError - unable to verify whether bucket %s exists', 
bucket_name)
   ```

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -357,19 +363,47 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     given pipeline. If the pipeline is absent from the environment while
     create_if_absent is True, creates and returns a new file based cache
     manager for the pipeline."""
+    if self._is_in_ipython:
+      warnings.filterwarnings(
+          'ignore',
+          'options is deprecated since First stable release. References to '
+          '<pipeline>.options will not be supported',
+          category=DeprecationWarning)
+
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if isinstance(pipeline, Pipeline):
+      if hasattr(pipeline.runner, '_underlying_runner'):
+        pipeline_runner = pipeline.runner._underlying_runner
+      else:
+        pipeline_runner = pipeline.runner
+    else:
+      pipeline_runner = None
     if not cache_manager and create_if_absent:
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+      cache_root = self.options.cache_root
+      if cache_root:
+        if cache_root.startswith('gs://'):
+          cache_dir = self._get_gcs_cache_dir(pipeline, cache_root)
+        else:
+          cache_dir = tempfile.mkdtemp(dir=cache_root)
+          if not isinstance(pipeline_runner, direct_runner.DirectRunner):
+            _LOGGER.warning(
+                'A local cache directory has been specified while '
+                'not using DirectRunner. It is recommended to cache into a '
+                'GCS bucket instead.')
       else:
-        cache_dir = tempfile.mkdtemp(
-            suffix=str(id(pipeline)),
-            prefix='it-',
-            dir=os.environ.get('TEST_TMPDIR', None))
+        staging_location = pipeline.options.get_all_options(
+        )['staging_location']
+        if isinstance(pipeline_runner, DataflowRunner) and staging_location:
+          cache_dir = self._get_gcs_cache_dir(pipeline, staging_location)
+          _LOGGER.warning(
+              'No cache_root detected. '
+              'Defaulting to temp_location %s for cache location.',

Review comment:
       staging_location

##########
File path: sdks/python/apache_beam/runners/interactive/utils.py
##########
@@ -32,13 +32,22 @@
 import apache_beam as beam
 from apache_beam.dataframe.convert import to_pcollection
 from apache_beam.dataframe.frame_base import DeferredBase
+from apache_beam.internal.gcp import auth
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 from apache_beam.runners.interactive.caching.cacheable import Cacheable
 from apache_beam.runners.interactive.caching.cacheable import CacheKey
 from apache_beam.runners.interactive.caching.expression_cache import 
ExpressionCache
 from apache_beam.testing.test_stream import WindowedValueHolder
 from apache_beam.typehints.schemas import named_fields_from_element_type
 
+# Protect against environments where apitools library is not available.
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:

Review comment:
       This doesn't seem right. You should probably merge the import in the 
try-except block in the `assert_bucket_exists ` function and add an except 
block for `ImportError` and log a similar warning.
   
   I'll put the suggestion in place below.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -357,19 +363,47 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     given pipeline. If the pipeline is absent from the environment while
     create_if_absent is True, creates and returns a new file based cache
     manager for the pipeline."""
+    if self._is_in_ipython:
+      warnings.filterwarnings(
+          'ignore',
+          'options is deprecated since First stable release. References to '
+          '<pipeline>.options will not be supported',
+          category=DeprecationWarning)
+
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if isinstance(pipeline, Pipeline):
+      if hasattr(pipeline.runner, '_underlying_runner'):
+        pipeline_runner = pipeline.runner._underlying_runner
+      else:
+        pipeline_runner = pipeline.runner
+    else:
+      pipeline_runner = None
     if not cache_manager and create_if_absent:
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+      cache_root = self.options.cache_root
+      if cache_root:
+        if cache_root.startswith('gs://'):
+          cache_dir = self._get_gcs_cache_dir(pipeline, cache_root)
+        else:
+          cache_dir = tempfile.mkdtemp(dir=cache_root)
+          if not isinstance(pipeline_runner, direct_runner.DirectRunner):
+            _LOGGER.warning(
+                'A local cache directory has been specified while '
+                'not using DirectRunner. It is recommended to cache into a '
+                'GCS bucket instead.')
       else:
-        cache_dir = tempfile.mkdtemp(
-            suffix=str(id(pipeline)),
-            prefix='it-',
-            dir=os.environ.get('TEST_TMPDIR', None))
+        staging_location = pipeline.options.get_all_options(
+        )['staging_location']
+        if isinstance(pipeline_runner, DataflowRunner) and staging_location:
+          cache_dir = self._get_gcs_cache_dir(pipeline, staging_location)
+          _LOGGER.warning(

Review comment:
       Maybe make this info level.

##########
File path: sdks/python/apache_beam/runners/interactive/utils_test.py
##########
@@ -42,6 +42,30 @@
 from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
+# Protect against environments where apitools library is not available.
+try:

Review comment:
       You can skip tests when there is an ImportError.
   
   For example: 
https://github.com/apache/beam/blob/70d9e2a08cc32192790cd9c98ffa15a756877a73/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py#L47
   
   And 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/testing/integration/screen_diff.py#L129
   

##########
File path: sdks/python/apache_beam/runners/interactive/utils_test.py
##########
@@ -42,6 +42,30 @@
 from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
+# Protect against environments where apitools library is not available.
+try:
+  from apitools.base.py.exceptions import HttpError
+  from apitools.base.py.exceptions import HttpNotFoundError
+except ImportError:

Review comment:
       ```
   except ImportError:
       _http_error_imported = False
       HttpError = ValueError
       HttpNotFoundError = ValueError
   else:
       _http_error_imported = True
   
   
   @unittest.skipIf(
       not _http_error_imported,
       'http errors are not imported.')
   class GCSUtilsTest...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 717375)
    Time Spent: 11h  (was: 10h 50m)

> Support cache directories that use GCS buckets
> ----------------------------------------------
>
>                 Key: BEAM-13734
>                 URL: https://issues.apache.org/jira/browse/BEAM-13734
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-py-interactive
>            Reporter: Victor Chen
>            Assignee: Victor Chen
>            Priority: P2
>          Time Spent: 11h
>  Remaining Estimate: 0h
>
> * Builds off of the work accomplished under BEAM-13685
>  * Modified interactive_environment.py to support caching to a bucket on GCS 
> for batch processing pipelines
>  * If a specified bucket does not exist, the pipeline will terminate and 
> return an error specifying that the bucket does not exist
>  * Added cleanup() functionality to cache_manager.py, to enable the 
> FileBasedCacheManager class to automatically delete cached values on GCS when 
> a bucket path is specified
>  * Added docstring to interactive_beam.py with an example of GCS path 
> assignment
>  * Cached files on GCS will be stored under a directory represented by the 
> value of id(pipeline).
>  ** Example cached path: gs://my-gcs-bucket/cache/dir/id(pipeline)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to