shunping commented on code in PR #35253:
URL: https://github.com/apache/beam/pull/35253#discussion_r2234129573


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -385,13 +431,131 @@ def _rename_batch(batch):
       # This error is not serious, we simply log it.
       _LOGGER.info('Unable to delete file: %s', init_result)
 
+  @check_accessible(['file_path_prefix'])
+  def finalize_windowed_write(
+      self, init_result, writer_results, unused_pre_finalize_results, w=None):
+    writer_results = sorted(writer_results)
+    num_shards = len(writer_results)
+
+    src_files, dst_files, delete_files, num_skipped = (
+        self._check_state_for_finalize_write(writer_results, num_shards, w))
+    num_skipped += len(delete_files)
+    FileSystems.delete(delete_files)
+    num_shards_to_finalize = len(src_files)
+    min_threads = min(num_shards_to_finalize, 
FileBasedSink._MAX_RENAME_THREADS)
+    num_threads = max(1, min_threads)
+
+    chunk_size = FileSystems.get_chunk_size(self.file_path_prefix.get())
+    source_file_batch = [
+        src_files[i:i + chunk_size]
+        for i in range(0, len(src_files), chunk_size)
+    ]
+    destination_file_batch = [
+        dst_files[i:i + chunk_size]
+        for i in range(0, len(dst_files), chunk_size)
+    ]
+
+    if num_shards_to_finalize:
+      start_time = time.time()
+
+      def _rename_batch(batch):
+        """_rename_batch executes batch rename operations."""
+        source_files, destination_files = batch
+        exceptions = []
+        try:
+          FileSystems.rename(source_files, destination_files)
+          return exceptions
+        except BeamIOError as exp:
+          if exp.exception_details is None:
+            raise
+          for (src, dst), exception in exp.exception_details.items():
+            if exception:
+              _LOGGER.error(
+                  ('Exception in _rename_batch. src: %s, '
+                   'dst: %s, err: %s'),
+                  src,
+                  dst,
+                  exception)
+              exceptions.append(exception)
+            else:
+              _LOGGER.debug('Rename successful: %s -> %s', src, dst)
+          return exceptions
+
+      if w is None or isinstance(w, window.GlobalWindow):
+        # bounded input is handled by finalize_write
+        # this should not be executed
+        # Use a thread pool for renaming operations.
+        exception_batches = util.run_using_threadpool(
+            _rename_batch,
+            list(zip(source_file_batch, destination_file_batch)),
+            num_threads)
+
+        all_exceptions = [
+            e for exception_batch in exception_batches for e in exception_batch
+        ]
+        if all_exceptions:
+          raise Exception(
+              'Encountered exceptions in finalize_write: %s' % all_exceptions)
+
+        yield from dst_files
+      else:
+        # unbounded input
+        batch = list([src_files, dst_files])
+        exception_batches = _rename_batch(batch)
+
+        all_exceptions = [
+            e for exception_batch in exception_batches for e in exception_batch
+        ]
+        if all_exceptions:
+          raise Exception(
+              'Encountered exceptions in finalize_write: %s' % all_exceptions)
+
+        yield from dst_files
+
+      _LOGGER.info(
+          'Renamed %d shards in %.2f seconds.',
+          num_shards_to_finalize,
+          time.time() - start_time)
+    else:
+      _LOGGER.warning(
+          'No shards found to finalize. num_shards: %d, skipped: %d',
+          num_shards,
+          num_skipped)
+
+    try:
+      FileSystems.delete([init_result])
+    except IOError:
+      # This error is not serious, we simply log it.
+      _LOGGER.info('Unable to delete file: %s', init_result)
+
+  @staticmethod
+  def _template_replace_window(shard_name_template):
+    match = re.search('W+', shard_name_template)
+    if match:
+      shard_name_template = shard_name_template.replace(
+          match.group(0), '%%(window)0%ds' % len(match.group(0)))
+    match = re.search('V+', shard_name_template)
+    if match:
+      shard_name_template = shard_name_template.replace(
+          match.group(0), '%%(window_utc)0%ds' % len(match.group(0)))
+    return shard_name_template
+
+  @staticmethod
+  def _template_replace_uuid(shard_name_template):
+    match = re.search('U+', shard_name_template)
+    if match:
+      shard_name_template = shard_name_template.replace(
+          match.group(0), '%%(uuid)0%dd' % len(match.group(0)))
+    return FileBasedSink._template_replace_window(shard_name_template)
+
   @staticmethod
   def _template_replace_num_shards(shard_name_template):
     match = re.search('N+', shard_name_template)
     if match:
       shard_name_template = shard_name_template.replace(
           match.group(0), '%%(num_shards)0%dd' % len(match.group(0)))
-    return shard_name_template
+    #return shard_name_template
+    return FileBasedSink._template_replace_uuid(shard_name_template)

Review Comment:
   I know understand what these functions (i.e. `_template_replace_window, 
`_template_replace_uuid`, `_template_replace_num_shards`) are doing, but it is 
a bit confusing.
   
   How about we simply `_template_to_format` as the following?
   
   ```python
   def _template_to_format(shard_name_template):
     if not shard_name_template:
       return ''
     replace_funcs = [FileBasedSink._template_shard_num, 
FileBasedSink._template_num_shards, FileBasedSink._template_replace_uuid, 
FileBaseSink._template_replace_window]
     for func in replace_funcs:
       shard_name_template = func(shard_name_template)
     return shard_name_template
   ```
   
   Then we can have
   ```python
     def _template_replace_shard_num(shard_name_template):
       match = re.search('S+', shard_name_template)
       if match is None:
         # shard name is required in the template.
         raise ValueError(
             "Shard number pattern S+ not found in shard_name_template: %s" %
             shard_name_template)
       return shard_name_template.replace(
         match.group(0), '%%(shard_num)0%dd' % len(match.group(0)))
       ...
   
     def _template_replace_num_shards(shard_name_template):
       match = re.search('N+', shard_name_template)
       if match:
         shard_name_template = shard_name_template.replace(
             match.group(0), '%%(num_shards)0%dd' % len(match.group(0)))
       return shard_name_template
   
     def _template_replace_uuid(shard_name_template):
       ...
   
     def _template_replace_window(shard_name_template):
       ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to