gemini-code-assist[bot] commented on code in PR #38484:
URL: https://github.com/apache/beam/pull/38484#discussion_r3235917687


##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -684,13 +684,22 @@ def expand(self, pcoll):
       dir, name = self.path, ''
     else:
       dir, name = io.filesystems.FileSystems.split(self.path)
+    num_shards = self.kwargs.pop('num_shards', None)
+    max_writers_per_bundle = self.kwargs.pop('max_writers_per_bundle', None)
+    write_to_files_kwargs = {}
+    if num_shards is not None:
+      write_to_files_kwargs['shards'] = num_shards
+      write_to_files_kwargs['max_writers_per_bundle'] = (
+          0 if max_writers_per_bundle is None else max_writers_per_bundle)
+    elif max_writers_per_bundle is not None:
+      write_to_files_kwargs['max_writers_per_bundle'] = max_writers_per_bundle
     return pcoll | fileio.WriteToFiles(
         path=dir,
-        shards=self.kwargs.pop('num_shards', None),
         file_naming=self.kwargs.pop(
             'file_naming', fileio.default_file_naming(name)),
         sink=lambda _: _WriteToPandasFileSink(
-            self.writer, self.args, self.kwargs, self.incremental, 
self.binary))
+            self.writer, self.args, self.kwargs, self.incremental, 
self.binary),
+        **write_to_files_kwargs)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic for handling `num_shards` and `max_writers_per_bundle` can be 
simplified for better readability. Additionally, using a local copy of 
`self.kwargs` instead of mutating the instance attribute directly is a safer 
practice in `expand()`. This ensures that the transform remains idempotent and 
avoids unexpected side effects if the same transform instance is used multiple 
times in a pipeline.
   
   Regarding the `max_writers_per_bundle=0` default: in Beam's `WriteToFiles`, 
a value of `0` is often used to signify 'unlimited' writers per bundle. Note 
that when `shards` is set to a positive integer, `WriteToFiles` uses a 
shuffle-based path where each shard is processed individually, making 
`max_writers_per_bundle` effectively irrelevant. However, if `num_shards` is 
`0` (which triggers dynamic sharding), this parameter becomes important for 
controlling the number of files produced by preventing the default limit (20) 
from closing and reopening files.
   
   ```suggestion
       kwargs = dict(self.kwargs)
       num_shards = kwargs.pop('num_shards', None)
       max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
   
       write_to_files_kwargs = {}
       if num_shards is not None:
         write_to_files_kwargs['shards'] = num_shards
         write_to_files_kwargs['max_writers_per_bundle'] = (
             max_writers_per_bundle if max_writers_per_bundle is not None else 
0)
       elif max_writers_per_bundle is not None:
         write_to_files_kwargs['max_writers_per_bundle'] = 
max_writers_per_bundle
   
       return pcoll | fileio.WriteToFiles(
           path=dir,
           file_naming=kwargs.pop(
               'file_naming', fileio.default_file_naming(name)),
           sink=lambda _: _WriteToPandasFileSink(
               self.writer, self.args, kwargs, self.incremental, self.binary),
           **write_to_files_kwargs)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to