KevinGG commented on a change in pull request #16601:
URL: https://github.com/apache/beam/pull/16601#discussion_r794048743



##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -373,11 +376,24 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
               '/'.join(cache_root_path.parts[1:]), id(pipeline))
         else:
           cache_dir = tempfile.mkdtemp(dir=cache_root)
+          if not isinstance(pipeline_runner, direct_runner.DirectRunner):
+            _LOGGER.warning(
+                'A local cache directory has been specified while '
+                'not using DirectRunner. It is recommended to cache into a '
+                'GCS bucket instead.')
       else:
-        cache_dir = tempfile.mkdtemp(
-            suffix=str(id(pipeline)),
-            prefix='it-',
-            dir=os.environ.get('TEST_TMPDIR', None))
+        temp_location = pipeline.options.get_all_options()['temp_location']

Review comment:
       Can we also add staging_location since temp_location is not always 
required.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -373,11 +376,24 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
               '/'.join(cache_root_path.parts[1:]), id(pipeline))
         else:
           cache_dir = tempfile.mkdtemp(dir=cache_root)
+          if not isinstance(pipeline_runner, direct_runner.DirectRunner):
+            _LOGGER.warning(
+                'A local cache directory has been specified while '
+                'not using DirectRunner. It is recommended to cache into a '
+                'GCS bucket instead.')
       else:
-        cache_dir = tempfile.mkdtemp(
-            suffix=str(id(pipeline)),
-            prefix='it-',
-            dir=os.environ.get('TEST_TMPDIR', None))
+        temp_location = pipeline.options.get_all_options()['temp_location']
+        if isinstance(pipeline_runner, DataflowRunner()) and temp_location:
+          cache_dir = temp_location

Review comment:
       Make sure to add `/id(pipeline)`.
   
   You may abstract the code
   
   ```
         if cache_root.startswith('gs://'):
             cache_root_path = PurePath(cache_root)
             if len(cache_root_path.parts) < 2:
               _LOGGER.error('GCS bucket cache path is too short to be valid.')
               raise ValueError('cache_root GCS bucket path is invalid.')
             bucket_name = cache_root_path.parts[1]
             assert_bucket_exists(bucket_name)
             cache_dir = 'gs://{}/{}'.format(
                 '/'.join(cache_root_path.parts[1:]), id(pipeline))
   ```
   
   into a function and re-apply it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to