pabloem commented on a change in pull request #15106:
URL: https://github.com/apache/beam/pull/15106#discussion_r661764871



##########
File path: sdks/python/apache_beam/io/fileio.py
##########
@@ -243,6 +249,57 @@ def process(
     yield ReadableFile(metadata, self._compression)
 
 
+@experimental()
+class MatchContinuously(beam.PTransform):
+  """Checks for new files for a given pattern every interval.
+
+  This ``PTransform`` returns a ``PCollection`` of matching files in the form
+  of ``FileMetadata`` objects.
+  """
+  def __init__(
+      self,
+      file_pattern,
+      interval=360.0,
+      has_deduplication=True,
+      start_timestamp=Timestamp.now(),
+      stop_timestamp=MAX_TIMESTAMP):
+    """Initializes a MatchContinuously transform.
+
+    Args:
+      file_pattern: The file path to read from.
+      interval: Interval at which to check for files.

Review comment:
       let's specify that the interval is in seconds (right?)

##########
File path: sdks/python/apache_beam/io/fileio_test.py
##########
@@ -320,6 +321,71 @@ def test_transform_on_gcs(self):
           label='Assert Checksums')
 
 
+class MatchContinuouslyTest(_TestCaseWithTempDirCleanUp):
+  def test_with_deduplication(self):
+    files = []
+    tempdir = '%s%s' % (self._new_tempdir(), os.sep)
+
+    # Create a file to be matched before pipeline
+    files.append(self._create_temp_file(dir=tempdir))
+    # Add file name that will be created mid-pipeline
+    files.append(FileSystems.join(tempdir, 'extra'))
+
+    interval = 1
+    start = Timestamp.now()
+    stop = start + interval + 0.1
+
+    def _create_extra_file(element):
+      writer = FileSystems.create(FileSystems.join(tempdir, 'extra'))
+      writer.close()
+      return element.path
+
+    with TestPipeline() as p:
+      match_continiously = (
+          p
+          | fileio.MatchContinuously(
+              file_pattern=FileSystems.join(tempdir, '*'),
+              interval=interval,
+              start_timestamp=start,
+              stop_timestamp=stop)
+          | beam.Map(_create_extra_file))
+
+      assert_that(match_continiously, equal_to(files))
+
+  def test_without_deduplication(self):
+    interval = 1

Review comment:
       same

##########
File path: sdks/python/apache_beam/io/fileio.py
##########
@@ -743,3 +800,20 @@ def finish_bundle(self):
               timestamp=key[1].start,
               windows=[key[1]]  # TODO(pabloem) HOW DO WE GET THE PANE
           ))
+
+
+class _RemoveDuplicates(beam.DoFn):
+
+  FILES_STATE = BagStateSpec('files', StrUtf8Coder())
+
+  def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
+    path = element[0]
+    file_metadata = element[1]
+    bag_content = [x for x in file_state.read()]
+
+    if not bag_content:
+      file_state.add(path)
+      _LOGGER.info('Generated entry for file %s', path)
+      yield file_metadata
+    else:
+      _LOGGER.info('File %s was already read', path)

Review comment:
       same

##########
File path: sdks/python/apache_beam/io/fileio_test.py
##########
@@ -320,6 +321,71 @@ def test_transform_on_gcs(self):
           label='Assert Checksums')
 
 
+class MatchContinuouslyTest(_TestCaseWithTempDirCleanUp):
+  def test_with_deduplication(self):
+    files = []
+    tempdir = '%s%s' % (self._new_tempdir(), os.sep)
+
+    # Create a file to be matched before pipeline
+    files.append(self._create_temp_file(dir=tempdir))
+    # Add file name that will be created mid-pipeline
+    files.append(FileSystems.join(tempdir, 'extra'))
+
+    interval = 1

Review comment:
       maybe make a shorter interval so the test will run faster? I suppose you 
can't run this with a TestStream? Have you tried adding a TestStream that moves 
time immediately?

##########
File path: sdks/python/apache_beam/io/fileio.py
##########
@@ -743,3 +800,20 @@ def finish_bundle(self):
               timestamp=key[1].start,
               windows=[key[1]]  # TODO(pabloem) HOW DO WE GET THE PANE
           ))
+
+
+class _RemoveDuplicates(beam.DoFn):
+
+  FILES_STATE = BagStateSpec('files', StrUtf8Coder())
+
+  def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
+    path = element[0]
+    file_metadata = element[1]
+    bag_content = [x for x in file_state.read()]
+
+    if not bag_content:
+      file_state.add(path)
+      _LOGGER.info('Generated entry for file %s', path)

Review comment:
       I think these logs should be `debug` level. WDYT?

##########
File path: sdks/python/apache_beam/io/fileio.py
##########
@@ -243,6 +249,57 @@ def process(
     yield ReadableFile(metadata, self._compression)
 
 
+@experimental()
+class MatchContinuously(beam.PTransform):
+  """Checks for new files for a given pattern every interval.
+
+  This ``PTransform`` returns a ``PCollection`` of matching files in the form
+  of ``FileMetadata`` objects.

Review comment:
       Java has an EmptyMatchTreatment. Should we add that here too? 
https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to