[ https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=173414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173414 ]
ASF GitHub Bot logged work on BEAM-6167: ---------------------------------------- Author: ASF GitHub Bot Created on: 10/Dec/18 08:15 Start Date: 10/Dec/18 08:15 Worklog Time Spent: 10m Work Description: robertwb closed pull request #7193: [BEAM-6167] Add class ReadFromTextWithFilename (Python) URL: https://github.com/apache/beam/pull/7193 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 73ab4a9deaaa..b31628ac3ca2 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -38,7 +38,8 @@ from apache_beam.transforms import PTransform from apache_beam.transforms.display import DisplayDataItem -__all__ = ['ReadFromText', 'ReadAllFromText', 'WriteToText'] +__all__ = ['ReadFromText', 'ReadFromTextWithFilename', 'ReadAllFromText', + 'WriteToText'] class _TextSource(filebasedsource.FileBasedSource): @@ -320,6 +321,14 @@ def _read_record(self, file_to_read, read_buffer): sep_bounds[1] - record_start_position_in_buffer) +class _TextSourceWithFilename(_TextSource): + def read_records(self, file_name, range_tracker): + records = super(_TextSourceWithFilename, self).read_records(file_name, + range_tracker) + for record in records: + yield (file_name, record) + + class _TextSink(filebasedsink.FileBasedSink): """A sink to a GCS or local text file or files.""" @@ -483,6 +492,9 @@ class ReadFromText(PTransform): ``ASCII``. This does not support other encodings such as ``UTF-16`` or ``UTF-32``. """ + + _source_class = _TextSource + def __init__( self, file_pattern=None, @@ -518,7 +530,7 @@ def __init__( """ super(ReadFromText, self).__init__(**kwargs) - self._source = _TextSource( + self._source = self._source_class( file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate=validate, skip_header_lines=skip_header_lines) @@ -527,6 +539,17 @@ def expand(self, pvalue): return pvalue.pipeline | Read(self._source) +class ReadFromTextWithFilename(ReadFromText): + r"""A :class:`~apache_beam.io.textio.ReadFromText` for reading text + files returning the name of the file and the content of the file. + + This class extend ReadFromText class just setting a different + _source_class attribute. + """ + + _source_class = _TextSourceWithFilename + + class WriteToText(PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to text files.""" diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 2ed3b4894940..e42e9400099d 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -20,6 +20,7 @@ from __future__ import division import bz2 +import datetime import glob import gzip import logging @@ -43,6 +44,7 @@ from apache_beam.io.textio import _TextSource as TextSource # Importing following private classes for testing. from apache_beam.io.textio import ReadFromText +from apache_beam.io.textio import ReadFromTextWithFilename from apache_beam.io.textio import WriteToText from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_utils import TempDir @@ -347,6 +349,15 @@ def test_read_from_text_single_file(self): assert_that(pcoll, equal_to(expected_data)) pipeline.run() + def test_read_from_text_with_file_name_single_file(self): + file_name, data = write_data(5) + expected_data = [(file_name, el) for el in data] + assert len(expected_data) == 5 + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(file_name) + assert_that(pcoll, equal_to(expected_data)) + pipeline.run() + def test_read_all_single_file(self): file_name, expected_data = write_data(5) assert len(expected_data) == 5 @@ -416,6 +427,21 @@ def test_read_from_text_file_pattern(self): assert_that(pcoll, equal_to(expected_data)) pipeline.run() + def test_read_from_text_with_file_name_file_pattern(self): + prefix = datetime.datetime.now().strftime("%Y%m%d%H%M%s") + file_name_1, data_1 = write_data(5, prefix=prefix) + file_name_2, data_2 = write_data(5, prefix=prefix) + expected_data = [] + expected_data.extend([(file_name_1, el) for el in data_1]) + expected_data.extend([(file_name_2, el) for el in data_2]) + folder = file_name_1[:file_name_1.rfind(os.path.sep)] + pattern = folder + os.path.sep + prefix + '*' + assert len(expected_data) == 10 + pipeline = TestPipeline() + pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(pattern) + assert_that(pcoll, equal_to(expected_data)) + pipeline.run() + def test_read_all_file_pattern(self): pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4]) assert len(expected_data) == 40 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 173414) Time Spent: 1h 50m (was: 1h 40m) > Create a Class to read content of a file keeping track of the file path > (python) > -------------------------------------------------------------------------------- > > Key: BEAM-6167 > URL: https://issues.apache.org/jira/browse/BEAM-6167 > Project: Beam > Issue Type: Improvement > Components: io-ideas > Affects Versions: 2.8.0 > Reporter: Lorenzo Caggioni > Assignee: Eugene Kirpichov > Priority: Minor > Fix For: Not applicable > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add a class to read content of a file keeping track of the file path each > element come from. > This is an improvement of the current python/apache_beam/io/textio.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)