robertwb commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r915257613
##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -68,6 +68,9 @@ def __init__(
shard_name_template=None,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO,
+ *,
Review Comment:
I don't think we have any guidance here (other than that which is generic to
Python, which would indicate most of these arguments should be passed by
keyword).
##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -108,6 +111,8 @@ def __init__(
shard_name_template)
self.compression_type = compression_type
self.mime_type = mime_type
+ self.max_records_per_shard = max_records_per_shard
+ self.max_bytes_per_shard = max_bytes_per_shard
Review Comment:
Nice catch. Fixed so that both take effect.
##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -848,8 +848,12 @@ class Writer(object):
See ``iobase.Sink`` for more detailed documentation about the process of
writing to a sink.
"""
- def write(self, value):
- """Writes a value to the sink using the current writer."""
+ def write(self, value) -> Optional[bool]:
Review Comment:
It's backwards compatible, which is why I made it Optional. But I've moved
to using at_capacity as suggested instead.
##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
self.sink = sink
self.temp_shard_path = temp_shard_path
self.temp_handle = self.sink.open(temp_shard_path)
+ self.num_records_written = 0
def write(self, value):
self.sink.write_record(self.temp_handle, value)
+ if self.sink.max_records_per_shard:
Review Comment:
Done.
##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -1184,7 +1188,9 @@ def process(self, element, init_result):
if self.writer is None:
# We ignore UUID collisions here since they are extremely rare.
self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
- self.writer.write(element)
+ if self.writer.write(element):
Review Comment:
Done.
##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
self.sink = sink
self.temp_shard_path = temp_shard_path
self.temp_handle = self.sink.open(temp_shard_path)
+ self.num_records_written = 0
def write(self, value):
self.sink.write_record(self.temp_handle, value)
+ if self.sink.max_records_per_shard:
+ self.num_records_written += 1
+ return self.num_records_written >= self.sink.max_records_per_shard
+ if self.sink.max_bytes_per_shard:
+ return (
+ self.sink.byte_counter.bytes_written >=
self.sink.max_bytes_per_shard)
def close(self):
self.sink.close(self.temp_handle)
return self.temp_shard_path
+
+
+class _ByteCountingWriter:
Review Comment:
Unfortunately io.BufferedWriter.write returns the number of bytes written
_for that call_, not a running total.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]