[ 
https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=173414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173414
 ]

ASF GitHub Bot logged work on BEAM-6167:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Dec/18 08:15
            Start Date: 10/Dec/18 08:15
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #7193: [BEAM-6167] Add 
class ReadFromTextWithFilename (Python)
URL: https://github.com/apache/beam/pull/7193
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/textio.py 
b/sdks/python/apache_beam/io/textio.py
index 73ab4a9deaaa..b31628ac3ca2 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -38,7 +38,8 @@
 from apache_beam.transforms import PTransform
 from apache_beam.transforms.display import DisplayDataItem
 
-__all__ = ['ReadFromText', 'ReadAllFromText', 'WriteToText']
+__all__ = ['ReadFromText', 'ReadFromTextWithFilename', 'ReadAllFromText',
+           'WriteToText']
 
 
 class _TextSource(filebasedsource.FileBasedSource):
@@ -320,6 +321,14 @@ def _read_record(self, file_to_read, read_buffer):
               sep_bounds[1] - record_start_position_in_buffer)
 
 
+class _TextSourceWithFilename(_TextSource):
+  def read_records(self, file_name, range_tracker):
+    records = super(_TextSourceWithFilename, self).read_records(file_name,
+                                                                range_tracker)
+    for record in records:
+      yield (file_name, record)
+
+
 class _TextSink(filebasedsink.FileBasedSink):
   """A sink to a GCS or local text file or files."""
 
@@ -483,6 +492,9 @@ class ReadFromText(PTransform):
   ``ASCII``.
   This does not support other encodings such as ``UTF-16`` or ``UTF-32``.
   """
+
+  _source_class = _TextSource
+
   def __init__(
       self,
       file_pattern=None,
@@ -518,7 +530,7 @@ def __init__(
     """
 
     super(ReadFromText, self).__init__(**kwargs)
-    self._source = _TextSource(
+    self._source = self._source_class(
         file_pattern, min_bundle_size, compression_type,
         strip_trailing_newlines, coder, validate=validate,
         skip_header_lines=skip_header_lines)
@@ -527,6 +539,17 @@ def expand(self, pvalue):
     return pvalue.pipeline | Read(self._source)
 
 
+class ReadFromTextWithFilename(ReadFromText):
+  r"""A :class:`~apache_beam.io.textio.ReadFromText` for reading text
+  files returning the name of the file and the content of the file.
+
+  This class extend ReadFromText class just setting a different
+  _source_class attribute.
+  """
+
+  _source_class = _TextSourceWithFilename
+
+
 class WriteToText(PTransform):
   """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to
   text files."""
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index 2ed3b4894940..e42e9400099d 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -20,6 +20,7 @@
 from __future__ import division
 
 import bz2
+import datetime
 import glob
 import gzip
 import logging
@@ -43,6 +44,7 @@
 from apache_beam.io.textio import _TextSource as TextSource
 # Importing following private classes for testing.
 from apache_beam.io.textio import ReadFromText
+from apache_beam.io.textio import ReadFromTextWithFilename
 from apache_beam.io.textio import WriteToText
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_utils import TempDir
@@ -347,6 +349,15 @@ def test_read_from_text_single_file(self):
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
+  def test_read_from_text_with_file_name_single_file(self):
+    file_name, data = write_data(5)
+    expected_data = [(file_name, el) for el in data]
+    assert len(expected_data) == 5
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(file_name)
+    assert_that(pcoll, equal_to(expected_data))
+    pipeline.run()
+
   def test_read_all_single_file(self):
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
@@ -416,6 +427,21 @@ def test_read_from_text_file_pattern(self):
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
+  def test_read_from_text_with_file_name_file_pattern(self):
+    prefix = datetime.datetime.now().strftime("%Y%m%d%H%M%s")
+    file_name_1, data_1 = write_data(5, prefix=prefix)
+    file_name_2, data_2 = write_data(5, prefix=prefix)
+    expected_data = []
+    expected_data.extend([(file_name_1, el) for el in data_1])
+    expected_data.extend([(file_name_2, el) for el in data_2])
+    folder = file_name_1[:file_name_1.rfind(os.path.sep)]
+    pattern = folder + os.path.sep + prefix + '*'
+    assert len(expected_data) == 10
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(pattern)
+    assert_that(pcoll, equal_to(expected_data))
+    pipeline.run()
+
   def test_read_all_file_pattern(self):
     pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
     assert len(expected_data) == 40


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 173414)
    Time Spent: 1h 50m  (was: 1h 40m)

> Create a Class to read content of a file keeping track of the file path 
> (python)
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-6167
>                 URL: https://issues.apache.org/jira/browse/BEAM-6167
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-ideas
>    Affects Versions: 2.8.0
>            Reporter: Lorenzo Caggioni
>            Assignee: Eugene Kirpichov
>            Priority: Minor
>             Fix For: Not applicable
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Add a class to read content of a file keeping track of the file path each 
> element come from.
> This is an improvement of the current python/apache_beam/io/textio.py



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to