This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 28818e3  [BEAM-2857] Implementing WriteToFiles transform for fileio 
(Python) (#8394)
28818e3 is described below

commit 28818e32c078316951da5dbc346d1f918a153d1a
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Tue Jun 4 23:40:48 2019 -0700

    [BEAM-2857] Implementing WriteToFiles transform for fileio (Python) (#8394)
    
    * [BEAM-2857] Implementing WriteToFiles transform for fileio (Python)
---
 sdks/python/apache_beam/io/fileio.py            | 558 +++++++++++++++++++++++-
 sdks/python/apache_beam/io/fileio_test.py       | 374 +++++++++++++++-
 sdks/python/apache_beam/io/filesystem.py        |   5 +-
 sdks/python/apache_beam/testing/util.py         |  19 +
 sdks/python/apache_beam/transforms/core.py      |  30 +-
 sdks/python/apache_beam/transforms/trigger.py   |   1 +
 sdks/python/apache_beam/transforms/window.py    |  16 +-
 sdks/python/apache_beam/utils/windowed_value.py |   8 +-
 8 files changed, 972 insertions(+), 39 deletions(-)

diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 10890ca..a1a7a58 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -23,17 +23,88 @@ and its metadata; and ``ReadMatches``, which takes in a 
``PCollection`` of file
 metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects.
 These transforms currently do not support splitting by themselves.
 
+Writing to Files
+================
+
+The transforms in this file include ``WriteToFiles``, which allows you to write
+a ``beam.PCollection`` to files, and gives you many options to customize how to
+do this.
+
+The ``WriteToFiles`` transform supports bounded and unbounded PCollections
+(i.e. it can be used both batch and streaming pipelines). For streaming
+pipelines, it currently does not have support for multiple trigger firings
+on the same window.
+
+File Naming
+-----------
+One of the parameters received by ``WriteToFiles`` is a function specifying how
+to name the files that are written. This is a function that takes in the
+following parameters:
+
+- window
+- pane
+- shard_index
+- total_shards
+- compression
+- destination
+
+It should return a file name that is unique for a combination of these
+parameters.
+
+The default naming strategy is to name files
+in the format
+`$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix`,
+where:
+
+- `$prefix` is, by default, `"output"`.
+- `$start` and `$end` are the boundaries of the window for the data being
+  written. These are omitted if we're using the Global window.
+- `$pane` is the index for the number of firing for a window.
+- `$shard` and `$numShards` are the current shard number, and the total number
+  of shards for this window firing.
+- `$suffix` is, by default, an empty string, but it can be set by the user via
+  ``default_file_naming``.
+
+Dynamic Destinations
+--------------------
+If the elements in the input ``beam.PCollection`` can be partitioned into 
groups
+that should be treated differently (e.g. some events are to be stored as CSV,
+while some others are to be stored as Avro files), it is possible to do this
+by passing a `destination` parameter to ``WriteToFiles``. Something like the
+following::
+
+    my_pcollection | beam.io.fileio.WriteToFiles(
+          path='/my/file/path',
+          destination=lambda record: 'avro' if record['type'] == 'A' else 
'csv',
+          sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
+          file_naming=beam.io.fileio.destination_prefix_naming())
+
+In this transform, depending on the type of a record, it will be written down 
to
+a destination named `'avro'`, or `'csv'`. The value returned by the
+`destination` call is then passed to the `sink` call, to determine what sort of
+sink will be used for each destination. The return type of the `destination`
+parameter can be anything, as long as elements can be grouped by it.
+
 No backward compatibility guarantees. Everything in this module is 
experimental.
 """
 
 from __future__ import absolute_import
 
+import collections
+import logging
+import random
+import uuid
+
 from past.builtins import unicode
 
 import apache_beam as beam
 from apache_beam.io import filesystem
 from apache_beam.io import filesystems
 from apache_beam.io.filesystem import BeamIOError
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import ValueProvider
+from apache_beam.transforms.window import GlobalWindow
 from apache_beam.utils.annotations import experimental
 
 __all__ = ['EmptyMatchTreatment',
@@ -148,10 +219,11 @@ class ReadableFile(object):
     self.metadata = metadata
 
   def open(self, mime_type='text/plain'):
-    return filesystems.FileSystems.open(self.metadata.path)
+    return filesystems.FileSystems.open(self.metadata.path,
+                                        mime_type=mime_type)
 
   def read(self):
-    return self.open().read()
+    return self.open('application/octet-stream').read()
 
   def read_utf8(self):
     return self.open().read().decode('utf-8')
@@ -170,3 +242,485 @@ class ReadMatches(beam.PTransform):
   def expand(self, pcoll):
     return pcoll | beam.ParDo(_ReadMatchesFn(self._compression,
                                              self._skip_directories))
+
+
+class FileSink(object):
+  """Specifies how to write elements to individual files in ``WriteToFiles``.
+
+  **NOTE: THIS CLASS IS EXPERIMENTAL.**
+
+  A Sink class must implement the following:
+
+   - The ``open`` method, which initializes writing to a file handler (it is 
not
+     responsible for opening the file handler itself).
+   - The ``write`` method, which writes an element to the file that was passed
+     in ``open``.
+   - The ``flush`` method, which flushes any buffered state. This is most often
+     called before closing a file (but not exclusively called in that
+     situation). The sink is not responsible for closing the file handler.
+   """
+
+  def open(self, fh):
+    raise NotImplementedError
+
+  def write(self, record):
+    raise NotImplementedError
+
+  def flush(self):
+    raise NotImplementedError
+
+
+@beam.typehints.with_input_types(str)
+class TextSink(FileSink):
+  """A sink that encodes utf8 elements, and writes to file handlers.
+
+  **NOTE: THIS CLASS IS EXPERIMENTAL.**
+
+  This sink simply calls file_handler.write(record.encode('utf8') + '\n') on 
all
+  records that come into it.
+  """
+
+  def open(self, fh):
+    self._fh = fh
+
+  def write(self, record):
+    self._fh.write(record.encode('utf8'))
+    self._fh.write(b'\n')
+
+  def flush(self):
+    self._fh.flush()
+
+
+def prefix_naming(prefix):
+  return default_file_naming(prefix)
+
+
+_DEFAULT_FILE_NAME_TEMPLATE = (
+    '{prefix}-{start}-{end}-{pane}-'
+    '{shard:05d}-{total_shards:05d}'
+    '{suffix}{compression}')
+
+
+def destination_prefix_naming():
+
+  def _inner(window, pane, shard_index, total_shards, compression, 
destination):
+    kwargs = {'prefix': str(destination),
+              'start': '',
+              'end': '',
+              'pane': '',
+              'shard': 0,
+              'total_shards': 0,
+              'suffix': '',
+              'compression': ''}
+    if total_shards is not None and shard_index is not None:
+      kwargs['shard'] = int(shard_index)
+      kwargs['total_shards'] = int(total_shards)
+
+    if window != GlobalWindow():
+      kwargs['start'] = window.start.to_utc_datetime().isoformat()
+      kwargs['end'] = window.end.to_utc_datetime().isoformat()
+
+    # TODO(BEAM-3759): Add support for PaneInfo
+    # If the PANE is the ONLY firing in the window, we don't add it.
+    #if pane and not (pane.is_first and pane.is_last):
+    #  kwargs['pane'] = pane.index
+
+    if compression:
+      kwargs['compression'] = '.%s' % compression
+
+    return _DEFAULT_FILE_NAME_TEMPLATE.format(**kwargs)
+
+  return _inner
+
+
+def default_file_naming(prefix, suffix=None):
+
+  def _inner(window, pane, shard_index, total_shards, compression, 
destination):
+    kwargs = {'prefix': prefix,
+              'start': '',
+              'end': '',
+              'pane': '',
+              'shard': 0,
+              'total_shards': 0,
+              'suffix': '',
+              'compression': ''}
+    if total_shards is not None and shard_index is not None:
+      kwargs['shard'] = int(shard_index)
+      kwargs['total_shards'] = int(total_shards)
+
+    if window != GlobalWindow():
+      kwargs['start'] = window.start.to_utc_datetime().isoformat()
+      kwargs['end'] = window.end.to_utc_datetime().isoformat()
+
+    # TODO(pabloem): Add support for PaneInfo
+    # If the PANE is the ONLY firing in the window, we don't add it.
+    #if pane and not (pane.is_first and pane.is_last):
+    #  kwargs['pane'] = pane.index
+
+    if compression:
+      kwargs['compression'] = '.%s' % compression
+    if suffix:
+      kwargs['suffix'] = suffix
+
+    return _DEFAULT_FILE_NAME_TEMPLATE.format(**kwargs)
+
+  return _inner
+
+
+_FileResult = collections.namedtuple('FileResult',
+                                     ['file_name',
+                                      'shard_index',
+                                      'total_shards',
+                                      'window',
+                                      'pane',
+                                      'destination'])
+
+
+# Adding a class to contain PyDoc.
+class FileResult(_FileResult):
+  """A descriptor of a file that has been written."""
+  pass
+
+
+@experimental()
+class WriteToFiles(beam.PTransform):
+  """Write the incoming PCollection to a set of output files.
+
+  The incoming ``PCollection`` may be bounded or unbounded.
+
+  **Note:** For unbounded ``PCollection``s, this transform does not support
+  multiple firings per Window (due to the fact that files are named only by
+  their destination, and window, at the moment).
+  """
+
+  # We allow up to 20 different destinations to be written in a single bundle.
+  # Too many files will add memory pressure to the worker, so we let it be 20.
+  MAX_NUM_WRITERS_PER_BUNDLE = 20
+
+  DEFAULT_SHARDING = 5
+
+  def __init__(self,
+               path,
+               file_naming=None,
+               destination=None,
+               temp_directory=None,
+               sink=None,
+               shards=None,
+               output_fn=None,
+               max_writers_per_bundle=MAX_NUM_WRITERS_PER_BUNDLE):
+    """Initializes a WriteToFiles transform.
+
+    Args:
+      path (str, ValueProvider): The directory to write files into.
+      file_naming (callable): A callable that takes in a window, pane,
+        shard_index, total_shards and compression; and returns a file name.
+      destination (callable): If this argument is provided, the sink parameter
+        must also be a callable.
+      temp_directory (str, ValueProvider): To ensure atomicity in the 
transform,
+        the output is written into temporary files, which are written to a
+        directory that is meant to be temporary as well. Once the whole output
+        has been written, the files are moved into their final destination, and
+        given their final names. By default, the temporary directory will be
+         within the temp_location of your pipeline.
+      sink (callable, FileSink): The sink to use to write into a file. It 
should
+        implement the methods of a ``FileSink``. If none is provided, a
+        ``TextSink`` is used.
+      shards (int): The number of shards per destination and trigger firing.
+      max_writers_per_bundle (int): The number of writers that can be open
+        concurrently in a single worker that's processing one bundle.
+    """
+    self.path = (
+        path if isinstance(path, ValueProvider) else StaticValueProvider(str,
+                                                                         path))
+    self.file_naming_fn = file_naming or default_file_naming('output')
+    self.destination_fn = self._get_destination_fn(destination)
+    self._temp_directory = temp_directory
+    self.sink_fn = self._get_sink_fn(sink)
+    self.shards = shards or WriteToFiles.DEFAULT_SHARDING
+    self.output_fn = output_fn or (lambda x: x)
+
+    self._max_num_writers_per_bundle = max_writers_per_bundle
+
+  @staticmethod
+  def _get_sink_fn(input_sink):
+    if isinstance(input_sink, FileSink):
+      return lambda x: input_sink
+    elif callable(input_sink):
+      return input_sink
+    else:
+      return lambda x: TextSink()
+
+  @staticmethod
+  def _get_destination_fn(destination):
+    if isinstance(destination, ValueProvider):
+      return lambda elm: destination.get()
+    elif callable(destination):
+      return destination
+    else:
+      return lambda elm: destination
+
+  def expand(self, pcoll):
+    p = pcoll.pipeline
+
+    if not self._temp_directory:
+      temp_location = (
+          p.options.view_as(GoogleCloudOptions).temp_location
+          or self.path.get())
+      dir_uid = str(uuid.uuid4())
+      self._temp_directory = StaticValueProvider(
+          str,
+          filesystems.FileSystems.join(temp_location,
+                                       '.temp%s' % dir_uid))
+      logging.info('Added temporary directory %s', self._temp_directory.get())
+
+    output = (pcoll
+              | beam.ParDo(_WriteUnshardedRecordsFn(
+                  base_path=self._temp_directory,
+                  destination_fn=self.destination_fn,
+                  sink_fn=self.sink_fn,
+                  max_writers_per_bundle=self._max_num_writers_per_bundle))
+              .with_outputs(_WriteUnshardedRecordsFn.SPILLED_RECORDS,
+                            _WriteUnshardedRecordsFn.WRITTEN_FILES))
+
+    written_files_pc = output[_WriteUnshardedRecordsFn.WRITTEN_FILES]
+    spilled_records_pc = output[_WriteUnshardedRecordsFn.SPILLED_RECORDS]
+
+    more_written_files_pc = (
+        spilled_records_pc
+        | beam.ParDo(_AppendShardedDestination(self.destination_fn,
+                                               self.shards))
+        | "GroupRecordsByDestinationAndShard" >> beam.GroupByKey()
+        | beam.ParDo(_WriteShardedRecordsFn(self._temp_directory,
+                                            self.sink_fn,
+                                            self.shards))
+    )
+
+    files_by_destination_pc = (
+        (written_files_pc, more_written_files_pc)
+        | beam.Flatten()
+        | beam.Map(lambda file_result: (file_result.destination, file_result))
+        | "GroupTempFilesByDestination" >> beam.GroupByKey())
+
+    # Now we should take the temporary files, and write them to the final
+    # destination, with their proper names.
+
+    file_results = (files_by_destination_pc
+                    | beam.ParDo(
+                        _MoveTempFilesIntoFinalDestinationFn(
+                            self.path, self.file_naming_fn,
+                            self._temp_directory)))
+
+    return file_results
+
+
+def _create_writer(base_path, writer_key):
+  try:
+    filesystems.FileSystems.mkdirs(base_path)
+  except IOError:
+    # Directory already exists.
+    pass
+
+  # The file name has a prefix determined by destination+window, along with
+  # a random string. This allows us to retrieve orphaned files later on.
+  file_name = '%s_%s' % (abs(hash(writer_key)), uuid.uuid4())
+  full_file_name = filesystems.FileSystems.join(base_path, file_name)
+  return full_file_name, filesystems.FileSystems.create(full_file_name)
+
+
+class _MoveTempFilesIntoFinalDestinationFn(beam.DoFn):
+
+  def __init__(self, path, file_naming_fn, temp_dir):
+    self.path = path
+    self.file_naming_fn = file_naming_fn
+    self.temporary_directory = temp_dir
+
+  def process(self,
+              element,
+              w=beam.DoFn.WindowParam):
+    destination = element[0]
+    file_results = list(element[1])
+
+    for i, r in enumerate(file_results):
+      # TODO(pabloem): Handle compression for files.
+      final_file_name = self.file_naming_fn(r.window,
+                                            r.pane,
+                                            i,
+                                            len(file_results),
+                                            '',
+                                            destination)
+
+      logging.info('Moving temporary file %s to dir: %s as %s. Res: %s',
+                   r.file_name, self.path.get(), final_file_name, r)
+
+      final_full_path = filesystems.FileSystems.join(self.path.get(),
+                                                     final_file_name)
+
+      # TODO(pabloem): Batch rename requests?
+      try:
+        filesystems.FileSystems.rename([r.file_name],
+                                       [final_full_path])
+      except BeamIOError:
+        # This error is not serious, because it may happen on a retry of the
+        # bundle. We simply log it.
+        logging.debug('File %s failed to be copied. This may be due to a 
bundle'
+                      ' being retried.', r.file_name)
+
+      yield FileResult(final_file_name,
+                       i,
+                       len(file_results),
+                       r.window,
+                       r.pane,
+                       destination)
+
+    logging.info('Cautiously removing temporary files for'
+                 ' destination %s and window %s', destination, w)
+    writer_key = (destination, w)
+    self._remove_temporary_files(writer_key)
+
+  def _remove_temporary_files(self, writer_key):
+    try:
+      prefix = filesystems.FileSystems.join(
+          self.temporary_directory.get(), str(abs(hash(writer_key))))
+      match_result = filesystems.FileSystems.match(['%s*' % prefix])
+      orphaned_files = [m.path for m in match_result[0].metadata_list]
+
+      logging.debug('Deleting orphaned files: %s', orphaned_files)
+      filesystems.FileSystems.delete(orphaned_files)
+    except BeamIOError as e:
+      logging.debug('Exceptions when deleting files: %s', e)
+
+
+class _WriteShardedRecordsFn(beam.DoFn):
+
+  def __init__(self, base_path, sink_fn, shards):
+    self.base_path = base_path
+    self.sink_fn = sink_fn
+    self.shards = shards
+
+  def process(self,
+              element,
+              w=beam.DoFn.WindowParam,
+              pane=beam.DoFn.PaneInfoParam):
+    destination_and_shard = element[0]
+    destination = destination_and_shard[0]
+    shard = destination_and_shard[1]
+    records = element[1]
+
+    full_file_name, writer = _create_writer(base_path=self.base_path.get(),
+                                            writer_key=(destination, w))
+    sink = self.sink_fn(destination)
+    sink.open(writer)
+
+    for r in records:
+      sink.write(r)
+
+    sink.flush()
+    writer.close()
+
+    logging.info('Writing file %s for destination %s and shard %s',
+                 full_file_name, destination, repr(shard))
+
+    yield FileResult(full_file_name,
+                     shard_index=shard,
+                     total_shards=self.shards,
+                     window=w,
+                     pane=pane,
+                     destination=destination)
+
+
+class _AppendShardedDestination(beam.DoFn):
+
+  def __init__(self, destination, shards):
+    self.destination_fn = destination
+    self.shards = shards
+
+    # We start the shards for a single destination at an arbitrary point.
+    self._shard_counter = collections.defaultdict(
+        lambda: random.randrange(self.shards))
+
+  def _next_shard_for_destination(self, destination):
+    self._shard_counter[destination] = (
+        (self._shard_counter[destination] + 1) % self.shards)
+
+    return self._shard_counter[destination]
+
+  def process(self, record):
+    destination = self.destination_fn(record)
+    shard = self._next_shard_for_destination(destination)
+
+    yield ((destination, shard), record)
+
+
+class _WriteUnshardedRecordsFn(beam.DoFn):
+
+  SPILLED_RECORDS = 'spilled_records'
+  WRITTEN_FILES = 'written_files'
+
+  def __init__(self,
+               base_path,
+               destination_fn,
+               sink_fn,
+               max_writers_per_bundle=WriteToFiles.MAX_NUM_WRITERS_PER_BUNDLE):
+    self.base_path = base_path
+    self.destination_fn = destination_fn
+    self.sink_fn = sink_fn
+    self.max_num_writers_per_bundle = max_writers_per_bundle
+
+  def start_bundle(self):
+    self._writers_and_sinks = {}
+    self._file_names = {}
+
+  def process(self,
+              record,
+              w=beam.DoFn.WindowParam,
+              pane=beam.DoFn.PaneInfoParam):
+    destination = self.destination_fn(record)
+
+    writer, sink = self._get_or_create_writer_and_sink(destination, w)
+
+    if not writer:
+      return [beam.pvalue.TaggedOutput(self.SPILLED_RECORDS, record)]
+    else:
+      sink.write(record)
+
+  def _get_or_create_writer_and_sink(self, destination, window):
+    """Returns a tuple of writer, sink."""
+    writer_key = (destination, window)
+
+    if writer_key in self._writers_and_sinks:
+      return self._writers_and_sinks.get(writer_key)
+    elif len(self._writers_and_sinks) >= self.max_num_writers_per_bundle:
+      # The writer does not exist, and we have too many writers already.
+      return None, None
+    else:
+      # The writer does not exist, but we can still create a new one.
+      full_file_name, writer = _create_writer(base_path=self.base_path.get(),
+                                              writer_key=writer_key)
+      sink = self.sink_fn(destination)
+
+      sink.open(writer)
+
+      self._writers_and_sinks[writer_key] = (writer, sink)
+      self._file_names[writer_key] = full_file_name
+      return self._writers_and_sinks[writer_key]
+
+  def finish_bundle(self):
+    for key, (writer, sink) in self._writers_and_sinks.items():
+
+      sink.flush()
+      writer.close()
+
+      file_result = FileResult(self._file_names[key],
+                               shard_index=-1,
+                               total_shards=0,
+                               window=key[1],
+                               pane=None,  # TODO(pabloem): get the pane info
+                               destination=key[0])
+
+      yield beam.pvalue.TaggedOutput(
+          self.WRITTEN_FILES,
+          beam.transforms.window.WindowedValue(
+              file_result,
+              timestamp=key[1].start,
+              windows=[key[1]]  # TODO(pabloem) HOW DO WE GET THE PANE
+          ))
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index 096149b..c533ef8 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -21,20 +21,38 @@ from __future__ import absolute_import
 
 import csv
 import io
+import json
 import logging
 import os
 import sys
 import unittest
+import uuid
 
+from hamcrest.library.text import stringmatches
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.io import fileio
 from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.test_utils import compute_hash
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import matches_all
+from apache_beam.transforms import trigger
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.transforms.window import GlobalWindow
+
+
+def _get_file_reader(readable_file):
+  if sys.version_info >= (3, 0):
+    return io.TextIOWrapper(readable_file.open())
+  else:
+    return readable_file.open()
 
 
 class MatchTest(_TestCaseWithTempDirCleanUp):
@@ -48,7 +66,9 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
     files.append(self._create_temp_file(dir=tempdir))
 
     with TestPipeline() as p:
-      files_pc = p | fileio.MatchFiles(tempdir) | beam.Map(lambda x: x.path)
+      files_pc = (p
+                  | fileio.MatchFiles(FileSystems.join(tempdir, '*'))
+                  | beam.Map(lambda x: x.path))
 
       assert_that(files_pc, equal_to(files))
 
@@ -66,7 +86,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
 
     with TestPipeline() as p:
       files_pc = (p
-                  | beam.Create(directories)
+                  | beam.Create([FileSystems.join(d, '*') for d in 
directories])
                   | fileio.MatchAll()
                   | beam.Map(lambda x: x.path))
 
@@ -85,7 +105,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
       with TestPipeline() as p:
         files_pc = (
             p
-            | beam.Create(directories)
+            | beam.Create([FileSystems.join(d, '*') for d in directories])
             | fileio.MatchAll(fileio.EmptyMatchTreatment.DISALLOW)
             | beam.Map(lambda x: x.path))
 
@@ -103,7 +123,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
     with TestPipeline() as p:
       files_pc = (
           p
-          | beam.Create(['%s*' % d for d in directories])
+          | beam.Create([FileSystems.join(d, '*') for d in directories])
           | fileio.MatchAll(fileio.EmptyMatchTreatment.ALLOW_IF_WILDCARD)
           | beam.Map(lambda x: x.path))
 
@@ -119,7 +139,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
 
     with TestPipeline() as p:
       content_pc = (p
-                    | beam.Create([dir])
+                    | beam.Create([FileSystems.join(dir, '*')])
                     | fileio.MatchAll()
                     | fileio.ReadMatches()
                     | beam.FlatMap(
@@ -134,18 +154,12 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
     dir = '%s%s' % (self._new_tempdir(), os.sep)
     self._create_temp_file(dir=dir, content=content)
 
-    def get_csv_reader(readable_file):
-      if sys.version_info >= (3, 0):
-        return csv.reader(io.TextIOWrapper(readable_file.open()))
-      else:
-        return csv.reader(readable_file.open())
-
     with TestPipeline() as p:
       content_pc = (p
-                    | beam.Create([dir])
+                    | beam.Create([FileSystems.join(dir, '*')])
                     | fileio.MatchAll()
                     | fileio.ReadMatches()
-                    | beam.FlatMap(get_csv_reader))
+                    | beam.FlatMap(lambda rf: 
csv.reader(_get_file_reader(rf))))
 
       assert_that(content_pc, equal_to(rows))
 
@@ -160,7 +174,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
 
     with TestPipeline() as p:
       contents_pc = (p
-                     | beam.Create(files + [tempdir])
+                     | beam.Create(files + ['%s/' % tempdir])
                      | fileio.ReadMatches()
                      | beam.FlatMap(
                          lambda x: x.read().decode('utf-8').splitlines()))
@@ -179,7 +193,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
     with self.assertRaises(beam.io.filesystem.BeamIOError):
       with TestPipeline() as p:
         _ = (p
-             | beam.Create(files + [tempdir])
+             | beam.Create(files + ['%s/' % tempdir])
              | fileio.ReadMatches(skip_directories=False)
              | beam.Map(lambda x: x.read_utf8()))
 
@@ -233,6 +247,336 @@ class MatchIntegrationTest(unittest.TestCase):
                   label='Assert Checksums')
 
 
+class WriteFilesTest(_TestCaseWithTempDirCleanUp):
+
+  SIMPLE_COLLECTION = [
+      {'project': 'beam', 'foundation': 'apache'},
+      {'project': 'prometheus', 'foundation': 'cncf'},
+      {'project': 'flink', 'foundation': 'apache'},
+      {'project': 'grpc', 'foundation': 'cncf'},
+      {'project': 'spark', 'foundation': 'apache'},
+      {'project': 'kubernetes', 'foundation': 'cncf'},
+      {'project': 'spark', 'foundation': 'apache'},
+      {'project': 'knative', 'foundation': 'cncf'},
+      {'project': 'linux', 'foundation': 'linux'},
+  ]
+
+  LARGER_COLLECTION = ['{:05d}'.format(i) for i in range(200)]
+
+  CSV_HEADERS = ['project', 'foundation']
+
+  SIMPLE_COLLECTION_VALIDATION_SET = set([
+      (elm['project'], elm['foundation']) for elm in SIMPLE_COLLECTION])
+
+  class CsvSink(fileio.TextSink):
+    def __init__(self, headers):
+      self.headers = headers
+
+    def write(self, record):
+      self._fh.write(','.join([record[h] for h in 
self.headers]).encode('utf8'))
+      self._fh.write('\n'.encode('utf8'))
+
+  class JsonSink(fileio.TextSink):
+
+    def write(self, record):
+      self._fh.write(json.dumps(record).encode('utf8'))
+      self._fh.write('\n'.encode('utf8'))
+
+  def test_write_to_single_file_batch(self):
+
+    dir = self._new_tempdir()
+
+    with TestPipeline() as p:
+      _ = (p
+           | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+           | "Serialize" >> beam.Map(json.dumps)
+           | beam.io.fileio.WriteToFiles(path=dir))
+
+    with TestPipeline() as p:
+      result = (p
+                | fileio.MatchFiles(FileSystems.join(dir, '*'))
+                | fileio.ReadMatches()
+                | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n')))
+
+      assert_that(result,
+                  equal_to([json.dumps(row) for row in 
self.SIMPLE_COLLECTION]))
+
+  def test_write_to_different_file_types_some_spilling(self):
+
+    dir = self._new_tempdir()
+
+    with TestPipeline() as p:
+      _ = (p
+           | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+           | beam.io.fileio.WriteToFiles(
+               path=dir,
+               destination=lambda record: record['foundation'],
+               sink=lambda dest: (
+                   WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS)
+                   if dest == 'apache' else WriteFilesTest.JsonSink()),
+               file_naming=fileio.destination_prefix_naming(),
+               max_writers_per_bundle=1))
+
+    with TestPipeline() as p:
+      cncf_res = (p
+                  | fileio.MatchFiles(FileSystems.join(dir, 'cncf*'))
+                  | fileio.ReadMatches()
+                  | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n')))
+
+      apache_res = (p
+                    | "MatchApache" >> fileio.MatchFiles(
+                        FileSystems.join(dir, 'apache*'))
+                    | "ReadApache" >> fileio.ReadMatches()
+                    | "MapApache" >> beam.FlatMap(
+                        lambda rf: csv.reader(_get_file_reader(rf))))
+
+      assert_that(cncf_res,
+                  equal_to([json.dumps(row)
+                            for row in self.SIMPLE_COLLECTION
+                            if row['foundation'] == 'cncf']),
+                  label='verifyCNCF')
+
+      assert_that(apache_res,
+                  equal_to([[row['project'], row['foundation']]
+                            for row in self.SIMPLE_COLLECTION
+                            if row['foundation'] == 'apache']),
+                  label='verifyApache')
+
+  def test_find_orphaned_files(self):
+    dir = self._new_tempdir()
+
+    write_transform = beam.io.fileio.WriteToFiles(path=dir)
+
+    def write_orphaned_file(temp_dir, writer_key):
+      temp_dir_path = FileSystems.join(dir, temp_dir)
+
+      file_prefix_dir = FileSystems.join(
+          temp_dir_path,
+          str(abs(hash(writer_key))))
+
+      file_name = '%s_%s' % (file_prefix_dir, uuid.uuid4())
+      with FileSystems.create(file_name) as f:
+        f.write(b'Hello y\'all')
+
+      return file_name
+
+    with TestPipeline() as p:
+      _ = (p
+           | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+           | "Serialize" >> beam.Map(json.dumps)
+           | write_transform)
+
+      # Pre-create the temp directory.
+      temp_dir_path = FileSystems.mkdirs(FileSystems.join(
+          dir, write_transform._temp_directory.get()))
+      write_orphaned_file(write_transform._temp_directory.get(),
+                          (None, GlobalWindow()))
+      f2 = write_orphaned_file(write_transform._temp_directory.get(),
+                               ('other-dest', GlobalWindow()))
+
+    temp_dir_path = FileSystems.join(dir, 
write_transform._temp_directory.get())
+    leftovers = FileSystems.match(['%s%s*' % (temp_dir_path, os.sep)])
+    found_files = [m.path for m in leftovers[0].metadata_list]
+    self.assertListEqual(found_files, [f2])
+
+  def test_write_to_different_file_types(self):
+
+    dir = self._new_tempdir()
+
+    with TestPipeline() as p:
+      _ = (p
+           | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+           | beam.io.fileio.WriteToFiles(
+               path=dir,
+               destination=lambda record: record['foundation'],
+               sink=lambda dest: (
+                   WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS)
+                   if dest == 'apache' else WriteFilesTest.JsonSink()),
+               file_naming=fileio.destination_prefix_naming()))
+
+    with TestPipeline() as p:
+      cncf_res = (p
+                  | fileio.MatchFiles(FileSystems.join(dir, 'cncf*'))
+                  | fileio.ReadMatches()
+                  | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n')))
+
+      apache_res = (p
+                    | "MatchApache" >> fileio.MatchFiles(
+                        FileSystems.join(dir, 'apache*'))
+                    | "ReadApache" >> fileio.ReadMatches()
+                    | "MapApache" >> beam.FlatMap(
+                        lambda rf: csv.reader(_get_file_reader(rf))))
+
+      assert_that(cncf_res,
+                  equal_to([json.dumps(row)
+                            for row in self.SIMPLE_COLLECTION
+                            if row['foundation'] == 'cncf']),
+                  label='verifyCNCF')
+
+      assert_that(apache_res,
+                  equal_to([[row['project'], row['foundation']]
+                            for row in self.SIMPLE_COLLECTION
+                            if row['foundation'] == 'apache']),
+                  label='verifyApache')
+
+  def record_dofn(self):
+    class RecordDoFn(beam.DoFn):
+      def process(self, element):
+        WriteFilesTest.all_records.append(element)
+
+    return RecordDoFn()
+
+  def test_streaming_complex_timing(self):
+    # Use state on the TestCase class, since other references would be pickled
+    # into a closure and not have the desired side effects.
+    #
+    # TODO(BEAM-5295): Use assert_that after it works for the cases here in
+    # streaming mode.
+    WriteFilesTest.all_records = []
+
+    dir = self._new_tempdir()
+
+    # Setting up the input (TestStream)
+    ts = TestStream().advance_watermark_to(0)
+    for elm in WriteFilesTest.LARGER_COLLECTION:
+      timestamp = int(elm)
+
+      ts.add_elements([('key', '%s' % elm)])
+      if timestamp % 5 == 0 and timestamp != 0:
+        # TODO(BEAM-3759): Add many firings per window after getting PaneInfo.
+        ts.advance_processing_time(5)
+        ts.advance_watermark_to(timestamp)
+
+    # The pipeline that we are testing
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      res = (p
+             | ts
+             | beam.WindowInto(
+                 FixedWindows(10),
+                 trigger=trigger.AfterWatermark(),
+                 accumulation_mode=trigger.AccumulationMode.DISCARDING)
+             | beam.GroupByKey()
+             | beam.FlatMap(lambda x: x[1]))
+      # Triggering after 5 processing-time seconds, and on the watermark. Also
+      # discarding old elements.
+
+      _ = (res
+           | beam.io.fileio.WriteToFiles(path=dir,
+                                         max_writers_per_bundle=0)
+           | beam.Map(lambda fr: FileSystems.join(dir, fr.file_name))
+           | beam.ParDo(self.record_dofn()))
+
+    # Verification pipeline
+    with TestPipeline() as p:
+      files = (p | beam.io.fileio.MatchFiles(FileSystems.join(dir, '*')))
+
+      file_names = (files | beam.Map(lambda fm: fm.path))
+
+      file_contents = (
+          files
+          | beam.io.fileio.ReadMatches()
+          | beam.Map(lambda rf: (rf.metadata.path,
+                                 rf.read_utf8().strip().split('\n'))))
+
+      content = (file_contents
+                 | beam.FlatMap(lambda fc: [ln.strip() for ln in fc[1]]))
+
+      assert_that(file_names, equal_to(WriteFilesTest.all_records),
+                  label='AssertFilesMatch')
+      assert_that(content, matches_all(WriteFilesTest.LARGER_COLLECTION),
+                  label='AssertContentsMatch')
+
+  def test_streaming_different_file_types(self):
+    dir = self._new_tempdir()
+    input = iter(WriteFilesTest.SIMPLE_COLLECTION)
+    ts = (TestStream()
+          .advance_watermark_to(0)
+          .add_elements([next(input), next(input)])
+          .advance_watermark_to(10)
+          .add_elements([next(input), next(input)])
+          .advance_watermark_to(20)
+          .add_elements([next(input), next(input)])
+          .advance_watermark_to(30)
+          .add_elements([next(input), next(input)])
+          .advance_watermark_to(40))
+
+    with TestPipeline() as p:
+      _ = (p
+           | ts
+           | beam.WindowInto(FixedWindows(10))
+           | beam.io.fileio.WriteToFiles(
+               path=dir,
+               destination=lambda record: record['foundation'],
+               sink=lambda dest: (
+                   WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS)
+                   if dest == 'apache' else WriteFilesTest.JsonSink()),
+               file_naming=fileio.destination_prefix_naming(),
+               max_writers_per_bundle=0,
+           ))
+
+    with TestPipeline() as p:
+      cncf_files = (p
+                    | fileio.MatchFiles(FileSystems.join(dir, 'cncf*'))
+                    | "CncfFileNames" >> beam.Map(lambda fm: fm.path))
+
+      apache_files = (p
+                      | "MatchApache" >> fileio.MatchFiles(
+                          FileSystems.join(dir, 'apache*'))
+                      | "ApacheFileNames" >> beam.Map(lambda fm: fm.path))
+
+      assert_that(cncf_files,
+                  matches_all([
+                      stringmatches.matches_regexp(
+                          FileSystems.join(
+                              dir,
+                              
'cncf-1970-01-01T00:00:00-1970-01-01T00:00:10--.*'
+                          )
+                      ),
+                      stringmatches.matches_regexp(
+                          FileSystems.join(
+                              dir,
+                              
'cncf-1970-01-01T00:00:10-1970-01-01T00:00:20--.*'
+                          )
+                      ),
+                      stringmatches.matches_regexp(
+                          FileSystems.join(
+                              dir,
+                              
'cncf-1970-01-01T00:00:20-1970-01-01T00:00:30--.*'
+                          )
+                      ),
+                      stringmatches.matches_regexp(
+                          FileSystems.join(
+                              dir,
+                              
'cncf-1970-01-01T00:00:30-1970-01-01T00:00:40--.*'
+                          )
+                      )
+                  ]),
+                  label='verifyCNCFFiles')
+
+      assert_that(apache_files,
+                  matches_all([
+                      stringmatches.matches_regexp(FileSystems.join(
+                          dir,
+                          'apache-1970-01-01T00:00:00-1970-01-01T00:00:10--.*')
+                                                  ),
+                      stringmatches.matches_regexp(FileSystems.join(
+                          dir,
+                          'apache-1970-01-01T00:00:10-1970-01-01T00:00:20--.*')
+                                                  ),
+                      stringmatches.matches_regexp(FileSystems.join(
+                          dir,
+                          'apache-1970-01-01T00:00:20-1970-01-01T00:00:30--.*')
+                                                  ),
+                      stringmatches.matches_regexp(FileSystems.join(
+                          dir,
+                          'apache-1970-01-01T00:00:30-1970-01-01T00:00:40--.*')
+                                                  )
+                  ]),
+                  label='verifyApacheFiles')
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index cfdf472..efc745a 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -400,8 +400,7 @@ class CompressedFile(object):
 
 
 class FileMetadata(object):
-  """Metadata about a file path that is the output of FileSystem.match
-  """
+  """Metadata about a file path that is the output of FileSystem.match."""
   def __init__(self, path, size_in_bytes):
     assert isinstance(path, (str, unicode)) and path, "Path should be a string"
     assert isinstance(size_in_bytes, (int, long)) and size_in_bytes >= 0, \
@@ -430,7 +429,7 @@ class FileMetadata(object):
 
 class MatchResult(object):
   """Result from the ``FileSystem`` match operation which contains the list
-   of matched FileMetadata.
+   of matched ``FileMetadata``.
   """
   def __init__(self, pattern, metadata_list):
     self.metadata_list = metadata_list
diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index 3099b0f..2e38d37 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -25,6 +25,9 @@ import io
 import tempfile
 from builtins import object
 
+from hamcrest.core import assert_that as hamcrest_assert
+from hamcrest.library.collection import contains_inanyorder
+
 from apache_beam import pvalue
 from apache_beam.transforms import window
 from apache_beam.transforms.core import Create
@@ -41,6 +44,7 @@ __all__ = [
     'equal_to',
     'is_empty',
     'is_not_empty',
+    'matches_all',
     # open_shards is internal and has no backwards compatibility guarantees.
     'open_shards',
     'TestWindowedValue',
@@ -142,6 +146,21 @@ def equal_to(expected):
   return _equal
 
 
+def matches_all(expected):
+  """Matcher used by assert_that to check a set of matchers.
+
+  Args:
+    expected: A list of elements or hamcrest matchers to be used to match
+      the elements of a single PCollection.
+  """
+  def _matches(actual):
+    expected_list = list(expected)
+
+    hamcrest_assert(actual, contains_inanyorder(*expected_list))
+
+  return _matches
+
+
 def is_empty():
   def _empty(actual):
     actual = list(actual)
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index ca1fb46..ead094b 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -420,11 +420,12 @@ class DoFn(WithTypeHints, HasDisplayData, 
urns.RunnerApiFn):
   SideInputParam = _DoFnParam('SideInputParam')
   TimestampParam = _DoFnParam('TimestampParam')
   WindowParam = _DoFnParam('WindowParam')
+  PaneInfoParam = _DoFnParam('PaneInfoParam')
   WatermarkReporterParam = _DoFnParam('WatermarkReporterParam')
   BundleFinalizerParam = _BundleFinalizerParam
 
   DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
-                       WindowParam, WatermarkReporterParam,
+                       WindowParam, WatermarkReporterParam, PaneInfoParam,
                        BundleFinalizerParam]
 
   # Parameters to access state and timers.  Not restricted to use only in the
@@ -2065,11 +2066,15 @@ class WindowInto(ParDo):
       new_windows = self.windowing.windowfn.assign(context)
       yield WindowedValue(element, context.timestamp, new_windows)
 
-  def __init__(self, windowfn, **kwargs):
+  def __init__(self,
+               windowfn,
+               trigger=None,
+               accumulation_mode=None,
+               timestamp_combiner=None):
     """Initializes a WindowInto transform.
 
     Args:
-      windowfn: Function to be used for windowing
+      windowfn (Windowing, WindowFn): Function to be used for windowing.
       trigger: (optional) Trigger used for windowing, or None for default.
       accumulation_mode: (optional) Accumulation mode used for windowing,
           required for non-trivial triggers.
@@ -2080,19 +2085,14 @@ class WindowInto(ParDo):
       # Overlay windowing with kwargs.
       windowing = windowfn
       windowfn = windowing.windowfn
-      # Use windowing to fill in defaults for kwargs.
-      kwargs = dict(dict(
-          trigger=windowing.triggerfn,
-          accumulation_mode=windowing.accumulation_mode,
-          timestamp_combiner=windowing.timestamp_combiner), **kwargs)
-    # Use kwargs to simulate keyword-only arguments.
-    triggerfn = kwargs.pop('trigger', None)
-    accumulation_mode = kwargs.pop('accumulation_mode', None)
-    timestamp_combiner = kwargs.pop('timestamp_combiner', None)
-    if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % list(kwargs))
+
+      # Use windowing to fill in defaults for the extra arguments.
+      trigger = trigger or windowing.triggerfn
+      accumulation_mode = accumulation_mode or windowing.accumulation_mode
+      timestamp_combiner = timestamp_combiner or windowing.timestamp_combiner
+
     self.windowing = Windowing(
-        windowfn, triggerfn, accumulation_mode, timestamp_combiner)
+        windowfn, trigger, accumulation_mode, timestamp_combiner)
     super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
 
   def get_windowing(self, unused_inputs):
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index b0d9a25..ddf4c24 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -297,6 +297,7 @@ class AfterProcessingTime(TriggerFn):
   """
 
   def __init__(self, delay=0):
+    """Initialize a processing time trigger with a delay in seconds."""
     self.delay = delay
 
   def __repr__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 1990532..e477303 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -361,12 +361,22 @@ class FixedWindows(NonMergingWindowFn):
 
   Attributes:
     size: Size of the window as seconds.
-    offset: Offset of this window as seconds since Unix epoch. Windows start at
-      t=N * size + offset where t=0 is the epoch. The offset must be a value
-      in range [0, size). If it is not it will be normalized to this range.
+    offset: Offset of this window as seconds. Windows start at
+      t=N * size + offset where t=0 is the UNIX epoch. The offset must be a
+      value in range [0, size). If it is not it will be normalized to this
+      range.
   """
 
   def __init__(self, size, offset=0):
+    """Initialize a ``FixedWindows`` function for a given size and offset.
+
+    Args:
+      size (int): Size of the window in seconds.
+      offset(int): Offset of this window as seconds. Windows start at
+        t=N * size + offset where t=0 is the UNIX epoch. The offset must be a
+        value in range [0, size). If it is not it will be normalized to this
+        range.
+    """
     if size <= 0:
       raise ValueError('The size parameter must be strictly positive.')
     self.size = Duration.of(size)
diff --git a/sdks/python/apache_beam/utils/windowed_value.py 
b/sdks/python/apache_beam/utils/windowed_value.py
index a0b6622..5570c45 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -46,7 +46,13 @@ class PaneInfoTiming(object):
 
 
 class PaneInfo(object):
-  """Describes the trigger firing information for a given WindowedValue."""
+  """Describes the trigger firing information for a given WindowedValue.
+
+  "Panes" represent individual firings on a single window. ``PaneInfo``s are
+  passed downstream after trigger firings. They contain information about
+  whether it's an early/on time/late firing, if it's the last or first firing
+  from a window, and the index of the firing.
+  """
 
   def __init__(self, is_first, is_last, timing, index, nonspeculative_index):
     self._is_first = is_first

Reply via email to