[ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92781
 ]

ASF GitHub Bot logged work on BEAM-4062:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/18 18:46
            Start Date: 19/Apr/18 18:46
    Worklog Time Spent: 10m 
      Work Description: udim commented on a change in pull request #5158: 
[BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182842848
 
 

 ##########
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##########
 @@ -188,39 +190,51 @@ def _get_final_name(self, shard_num, num_shards):
         self.file_name_suffix.get()
     ])
 
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def _get_final_name_glob(self, num_shards):
+    return ''.join([
+      self.file_path_prefix.get(),
+      self.shard_name_glob_format % dict(num_shards=num_shards),
+      self.file_name_suffix.get()
+    ])
+
   def pre_finalize(self, init_result, writer_results):
-    writer_results = sorted(writer_results)
-    num_shards = len(writer_results)
-    existing_files = []
-    for shard_num in range(len(writer_results)):
-      final_name = self._get_final_name(shard_num, num_shards)
-      if FileSystems.exists(final_name):
-        existing_files.append(final_name)
-    if existing_files:
-      logging.info('Deleting existing files in target path: %d',
-                   len(existing_files))
-      FileSystems.delete(existing_files)
+    num_shards = len(list(writer_results))
+    dst_glob = self._get_final_name_glob(num_shards)
+    dst_glob_files = [file_metadata.path
+                      for mr in FileSystems.match([dst_glob])
+                      for file_metadata in mr.metadata_list]
 
-  @check_accessible(['file_path_prefix'])
-  def finalize_write(self, init_result, writer_results,
-                     unused_pre_finalize_results):
-    writer_results = sorted(writer_results)
-    num_shards = len(writer_results)
+    if dst_glob_files:
+      logging.info('Deleting existing files in target path: %d',
+                   len(dst_glob_files))
+      FileSystems.delete(dst_glob_files)
+
+  def _check_state_for_finalize_write(self, writer_results, num_shards):
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92781)
    Time Spent: 1h 50m  (was: 1h 40m)

> Performance regression in FileBasedSink
> ---------------------------------------
>
>                 Key: BEAM-4062
>                 URL: https://issues.apache.org/jira/browse/BEAM-4062
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Blocker
>             Fix For: 2.5.0
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to