gemini-code-assist[bot] commented on code in PR #38484:
URL: https://github.com/apache/beam/pull/38484#discussion_r3235917687
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -684,13 +684,22 @@ def expand(self, pcoll):
dir, name = self.path, ''
else:
dir, name = io.filesystems.FileSystems.split(self.path)
+ num_shards = self.kwargs.pop('num_shards', None)
+ max_writers_per_bundle = self.kwargs.pop('max_writers_per_bundle', None)
+ write_to_files_kwargs = {}
+ if num_shards is not None:
+ write_to_files_kwargs['shards'] = num_shards
+ write_to_files_kwargs['max_writers_per_bundle'] = (
+ 0 if max_writers_per_bundle is None else max_writers_per_bundle)
+ elif max_writers_per_bundle is not None:
+ write_to_files_kwargs['max_writers_per_bundle'] = max_writers_per_bundle
return pcoll | fileio.WriteToFiles(
path=dir,
- shards=self.kwargs.pop('num_shards', None),
file_naming=self.kwargs.pop(
'file_naming', fileio.default_file_naming(name)),
sink=lambda _: _WriteToPandasFileSink(
- self.writer, self.args, self.kwargs, self.incremental,
self.binary))
+ self.writer, self.args, self.kwargs, self.incremental,
self.binary),
+ **write_to_files_kwargs)
Review Comment:

The logic for handling `num_shards` and `max_writers_per_bundle` can be
simplified for better readability. Additionally, using a local copy of
`self.kwargs` instead of mutating the instance attribute directly is a safer
practice in `expand()`. This ensures that the transform remains idempotent and
avoids unexpected side effects if the same transform instance is used multiple
times in a pipeline.
Regarding the `max_writers_per_bundle=0` default: in Beam's `WriteToFiles`,
a value of `0` is often used to signify 'unlimited' writers per bundle. Note
that when `shards` is set to a positive integer, `WriteToFiles` uses a
shuffle-based path where each shard is processed individually, making
`max_writers_per_bundle` effectively irrelevant. However, if `num_shards` is
`0` (which triggers dynamic sharding), this parameter becomes important for
controlling the number of files produced by preventing the default limit (20)
from closing and reopening files.
```suggestion
kwargs = dict(self.kwargs)
num_shards = kwargs.pop('num_shards', None)
max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
write_to_files_kwargs = {}
if num_shards is not None:
write_to_files_kwargs['shards'] = num_shards
write_to_files_kwargs['max_writers_per_bundle'] = (
max_writers_per_bundle if max_writers_per_bundle is not None else
0)
elif max_writers_per_bundle is not None:
write_to_files_kwargs['max_writers_per_bundle'] =
max_writers_per_bundle
return pcoll | fileio.WriteToFiles(
path=dir,
file_naming=kwargs.pop(
'file_naming', fileio.default_file_naming(name)),
sink=lambda _: _WriteToPandasFileSink(
self.writer, self.args, kwargs, self.incremental, self.binary),
**write_to_files_kwargs)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]