This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 28818e3 [BEAM-2857] Implementing WriteToFiles transform for fileio (Python) (#8394) 28818e3 is described below commit 28818e32c078316951da5dbc346d1f918a153d1a Author: Pablo <pabl...@users.noreply.github.com> AuthorDate: Tue Jun 4 23:40:48 2019 -0700 [BEAM-2857] Implementing WriteToFiles transform for fileio (Python) (#8394) * [BEAM-2857] Implementing WriteToFiles transform for fileio (Python) --- sdks/python/apache_beam/io/fileio.py | 558 +++++++++++++++++++++++- sdks/python/apache_beam/io/fileio_test.py | 374 +++++++++++++++- sdks/python/apache_beam/io/filesystem.py | 5 +- sdks/python/apache_beam/testing/util.py | 19 + sdks/python/apache_beam/transforms/core.py | 30 +- sdks/python/apache_beam/transforms/trigger.py | 1 + sdks/python/apache_beam/transforms/window.py | 16 +- sdks/python/apache_beam/utils/windowed_value.py | 8 +- 8 files changed, 972 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 10890ca..a1a7a58 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -23,17 +23,88 @@ and its metadata; and ``ReadMatches``, which takes in a ``PCollection`` of file metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects. These transforms currently do not support splitting by themselves. +Writing to Files +================ + +The transforms in this file include ``WriteToFiles``, which allows you to write +a ``beam.PCollection`` to files, and gives you many options to customize how to +do this. + +The ``WriteToFiles`` transform supports bounded and unbounded PCollections +(i.e. it can be used both batch and streaming pipelines). For streaming +pipelines, it currently does not have support for multiple trigger firings +on the same window. + +File Naming +----------- +One of the parameters received by ``WriteToFiles`` is a function specifying how +to name the files that are written. This is a function that takes in the +following parameters: + +- window +- pane +- shard_index +- total_shards +- compression +- destination + +It should return a file name that is unique for a combination of these +parameters. + +The default naming strategy is to name files +in the format +`$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix`, +where: + +- `$prefix` is, by default, `"output"`. +- `$start` and `$end` are the boundaries of the window for the data being + written. These are omitted if we're using the Global window. +- `$pane` is the index for the number of firing for a window. +- `$shard` and `$numShards` are the current shard number, and the total number + of shards for this window firing. +- `$suffix` is, by default, an empty string, but it can be set by the user via + ``default_file_naming``. + +Dynamic Destinations +-------------------- +If the elements in the input ``beam.PCollection`` can be partitioned into groups +that should be treated differently (e.g. some events are to be stored as CSV, +while some others are to be stored as Avro files), it is possible to do this +by passing a `destination` parameter to ``WriteToFiles``. Something like the +following:: + + my_pcollection | beam.io.fileio.WriteToFiles( + path='/my/file/path', + destination=lambda record: 'avro' if record['type'] == 'A' else 'csv', + sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(), + file_naming=beam.io.fileio.destination_prefix_naming()) + +In this transform, depending on the type of a record, it will be written down to +a destination named `'avro'`, or `'csv'`. The value returned by the +`destination` call is then passed to the `sink` call, to determine what sort of +sink will be used for each destination. The return type of the `destination` +parameter can be anything, as long as elements can be grouped by it. + No backward compatibility guarantees. Everything in this module is experimental. """ from __future__ import absolute_import +import collections +import logging +import random +import uuid + from past.builtins import unicode import apache_beam as beam from apache_beam.io import filesystem from apache_beam.io import filesystems from apache_beam.io.filesystem import BeamIOError +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.value_provider import StaticValueProvider +from apache_beam.options.value_provider import ValueProvider +from apache_beam.transforms.window import GlobalWindow from apache_beam.utils.annotations import experimental __all__ = ['EmptyMatchTreatment', @@ -148,10 +219,11 @@ class ReadableFile(object): self.metadata = metadata def open(self, mime_type='text/plain'): - return filesystems.FileSystems.open(self.metadata.path) + return filesystems.FileSystems.open(self.metadata.path, + mime_type=mime_type) def read(self): - return self.open().read() + return self.open('application/octet-stream').read() def read_utf8(self): return self.open().read().decode('utf-8') @@ -170,3 +242,485 @@ class ReadMatches(beam.PTransform): def expand(self, pcoll): return pcoll | beam.ParDo(_ReadMatchesFn(self._compression, self._skip_directories)) + + +class FileSink(object): + """Specifies how to write elements to individual files in ``WriteToFiles``. + + **NOTE: THIS CLASS IS EXPERIMENTAL.** + + A Sink class must implement the following: + + - The ``open`` method, which initializes writing to a file handler (it is not + responsible for opening the file handler itself). + - The ``write`` method, which writes an element to the file that was passed + in ``open``. + - The ``flush`` method, which flushes any buffered state. This is most often + called before closing a file (but not exclusively called in that + situation). The sink is not responsible for closing the file handler. + """ + + def open(self, fh): + raise NotImplementedError + + def write(self, record): + raise NotImplementedError + + def flush(self): + raise NotImplementedError + + +@beam.typehints.with_input_types(str) +class TextSink(FileSink): + """A sink that encodes utf8 elements, and writes to file handlers. + + **NOTE: THIS CLASS IS EXPERIMENTAL.** + + This sink simply calls file_handler.write(record.encode('utf8') + '\n') on all + records that come into it. + """ + + def open(self, fh): + self._fh = fh + + def write(self, record): + self._fh.write(record.encode('utf8')) + self._fh.write(b'\n') + + def flush(self): + self._fh.flush() + + +def prefix_naming(prefix): + return default_file_naming(prefix) + + +_DEFAULT_FILE_NAME_TEMPLATE = ( + '{prefix}-{start}-{end}-{pane}-' + '{shard:05d}-{total_shards:05d}' + '{suffix}{compression}') + + +def destination_prefix_naming(): + + def _inner(window, pane, shard_index, total_shards, compression, destination): + kwargs = {'prefix': str(destination), + 'start': '', + 'end': '', + 'pane': '', + 'shard': 0, + 'total_shards': 0, + 'suffix': '', + 'compression': ''} + if total_shards is not None and shard_index is not None: + kwargs['shard'] = int(shard_index) + kwargs['total_shards'] = int(total_shards) + + if window != GlobalWindow(): + kwargs['start'] = window.start.to_utc_datetime().isoformat() + kwargs['end'] = window.end.to_utc_datetime().isoformat() + + # TODO(BEAM-3759): Add support for PaneInfo + # If the PANE is the ONLY firing in the window, we don't add it. + #if pane and not (pane.is_first and pane.is_last): + # kwargs['pane'] = pane.index + + if compression: + kwargs['compression'] = '.%s' % compression + + return _DEFAULT_FILE_NAME_TEMPLATE.format(**kwargs) + + return _inner + + +def default_file_naming(prefix, suffix=None): + + def _inner(window, pane, shard_index, total_shards, compression, destination): + kwargs = {'prefix': prefix, + 'start': '', + 'end': '', + 'pane': '', + 'shard': 0, + 'total_shards': 0, + 'suffix': '', + 'compression': ''} + if total_shards is not None and shard_index is not None: + kwargs['shard'] = int(shard_index) + kwargs['total_shards'] = int(total_shards) + + if window != GlobalWindow(): + kwargs['start'] = window.start.to_utc_datetime().isoformat() + kwargs['end'] = window.end.to_utc_datetime().isoformat() + + # TODO(pabloem): Add support for PaneInfo + # If the PANE is the ONLY firing in the window, we don't add it. + #if pane and not (pane.is_first and pane.is_last): + # kwargs['pane'] = pane.index + + if compression: + kwargs['compression'] = '.%s' % compression + if suffix: + kwargs['suffix'] = suffix + + return _DEFAULT_FILE_NAME_TEMPLATE.format(**kwargs) + + return _inner + + +_FileResult = collections.namedtuple('FileResult', + ['file_name', + 'shard_index', + 'total_shards', + 'window', + 'pane', + 'destination']) + + +# Adding a class to contain PyDoc. +class FileResult(_FileResult): + """A descriptor of a file that has been written.""" + pass + + +@experimental() +class WriteToFiles(beam.PTransform): + """Write the incoming PCollection to a set of output files. + + The incoming ``PCollection`` may be bounded or unbounded. + + **Note:** For unbounded ``PCollection``s, this transform does not support + multiple firings per Window (due to the fact that files are named only by + their destination, and window, at the moment). + """ + + # We allow up to 20 different destinations to be written in a single bundle. + # Too many files will add memory pressure to the worker, so we let it be 20. + MAX_NUM_WRITERS_PER_BUNDLE = 20 + + DEFAULT_SHARDING = 5 + + def __init__(self, + path, + file_naming=None, + destination=None, + temp_directory=None, + sink=None, + shards=None, + output_fn=None, + max_writers_per_bundle=MAX_NUM_WRITERS_PER_BUNDLE): + """Initializes a WriteToFiles transform. + + Args: + path (str, ValueProvider): The directory to write files into. + file_naming (callable): A callable that takes in a window, pane, + shard_index, total_shards and compression; and returns a file name. + destination (callable): If this argument is provided, the sink parameter + must also be a callable. + temp_directory (str, ValueProvider): To ensure atomicity in the transform, + the output is written into temporary files, which are written to a + directory that is meant to be temporary as well. Once the whole output + has been written, the files are moved into their final destination, and + given their final names. By default, the temporary directory will be + within the temp_location of your pipeline. + sink (callable, FileSink): The sink to use to write into a file. It should + implement the methods of a ``FileSink``. If none is provided, a + ``TextSink`` is used. + shards (int): The number of shards per destination and trigger firing. + max_writers_per_bundle (int): The number of writers that can be open + concurrently in a single worker that's processing one bundle. + """ + self.path = ( + path if isinstance(path, ValueProvider) else StaticValueProvider(str, + path)) + self.file_naming_fn = file_naming or default_file_naming('output') + self.destination_fn = self._get_destination_fn(destination) + self._temp_directory = temp_directory + self.sink_fn = self._get_sink_fn(sink) + self.shards = shards or WriteToFiles.DEFAULT_SHARDING + self.output_fn = output_fn or (lambda x: x) + + self._max_num_writers_per_bundle = max_writers_per_bundle + + @staticmethod + def _get_sink_fn(input_sink): + if isinstance(input_sink, FileSink): + return lambda x: input_sink + elif callable(input_sink): + return input_sink + else: + return lambda x: TextSink() + + @staticmethod + def _get_destination_fn(destination): + if isinstance(destination, ValueProvider): + return lambda elm: destination.get() + elif callable(destination): + return destination + else: + return lambda elm: destination + + def expand(self, pcoll): + p = pcoll.pipeline + + if not self._temp_directory: + temp_location = ( + p.options.view_as(GoogleCloudOptions).temp_location + or self.path.get()) + dir_uid = str(uuid.uuid4()) + self._temp_directory = StaticValueProvider( + str, + filesystems.FileSystems.join(temp_location, + '.temp%s' % dir_uid)) + logging.info('Added temporary directory %s', self._temp_directory.get()) + + output = (pcoll + | beam.ParDo(_WriteUnshardedRecordsFn( + base_path=self._temp_directory, + destination_fn=self.destination_fn, + sink_fn=self.sink_fn, + max_writers_per_bundle=self._max_num_writers_per_bundle)) + .with_outputs(_WriteUnshardedRecordsFn.SPILLED_RECORDS, + _WriteUnshardedRecordsFn.WRITTEN_FILES)) + + written_files_pc = output[_WriteUnshardedRecordsFn.WRITTEN_FILES] + spilled_records_pc = output[_WriteUnshardedRecordsFn.SPILLED_RECORDS] + + more_written_files_pc = ( + spilled_records_pc + | beam.ParDo(_AppendShardedDestination(self.destination_fn, + self.shards)) + | "GroupRecordsByDestinationAndShard" >> beam.GroupByKey() + | beam.ParDo(_WriteShardedRecordsFn(self._temp_directory, + self.sink_fn, + self.shards)) + ) + + files_by_destination_pc = ( + (written_files_pc, more_written_files_pc) + | beam.Flatten() + | beam.Map(lambda file_result: (file_result.destination, file_result)) + | "GroupTempFilesByDestination" >> beam.GroupByKey()) + + # Now we should take the temporary files, and write them to the final + # destination, with their proper names. + + file_results = (files_by_destination_pc + | beam.ParDo( + _MoveTempFilesIntoFinalDestinationFn( + self.path, self.file_naming_fn, + self._temp_directory))) + + return file_results + + +def _create_writer(base_path, writer_key): + try: + filesystems.FileSystems.mkdirs(base_path) + except IOError: + # Directory already exists. + pass + + # The file name has a prefix determined by destination+window, along with + # a random string. This allows us to retrieve orphaned files later on. + file_name = '%s_%s' % (abs(hash(writer_key)), uuid.uuid4()) + full_file_name = filesystems.FileSystems.join(base_path, file_name) + return full_file_name, filesystems.FileSystems.create(full_file_name) + + +class _MoveTempFilesIntoFinalDestinationFn(beam.DoFn): + + def __init__(self, path, file_naming_fn, temp_dir): + self.path = path + self.file_naming_fn = file_naming_fn + self.temporary_directory = temp_dir + + def process(self, + element, + w=beam.DoFn.WindowParam): + destination = element[0] + file_results = list(element[1]) + + for i, r in enumerate(file_results): + # TODO(pabloem): Handle compression for files. + final_file_name = self.file_naming_fn(r.window, + r.pane, + i, + len(file_results), + '', + destination) + + logging.info('Moving temporary file %s to dir: %s as %s. Res: %s', + r.file_name, self.path.get(), final_file_name, r) + + final_full_path = filesystems.FileSystems.join(self.path.get(), + final_file_name) + + # TODO(pabloem): Batch rename requests? + try: + filesystems.FileSystems.rename([r.file_name], + [final_full_path]) + except BeamIOError: + # This error is not serious, because it may happen on a retry of the + # bundle. We simply log it. + logging.debug('File %s failed to be copied. This may be due to a bundle' + ' being retried.', r.file_name) + + yield FileResult(final_file_name, + i, + len(file_results), + r.window, + r.pane, + destination) + + logging.info('Cautiously removing temporary files for' + ' destination %s and window %s', destination, w) + writer_key = (destination, w) + self._remove_temporary_files(writer_key) + + def _remove_temporary_files(self, writer_key): + try: + prefix = filesystems.FileSystems.join( + self.temporary_directory.get(), str(abs(hash(writer_key)))) + match_result = filesystems.FileSystems.match(['%s*' % prefix]) + orphaned_files = [m.path for m in match_result[0].metadata_list] + + logging.debug('Deleting orphaned files: %s', orphaned_files) + filesystems.FileSystems.delete(orphaned_files) + except BeamIOError as e: + logging.debug('Exceptions when deleting files: %s', e) + + +class _WriteShardedRecordsFn(beam.DoFn): + + def __init__(self, base_path, sink_fn, shards): + self.base_path = base_path + self.sink_fn = sink_fn + self.shards = shards + + def process(self, + element, + w=beam.DoFn.WindowParam, + pane=beam.DoFn.PaneInfoParam): + destination_and_shard = element[0] + destination = destination_and_shard[0] + shard = destination_and_shard[1] + records = element[1] + + full_file_name, writer = _create_writer(base_path=self.base_path.get(), + writer_key=(destination, w)) + sink = self.sink_fn(destination) + sink.open(writer) + + for r in records: + sink.write(r) + + sink.flush() + writer.close() + + logging.info('Writing file %s for destination %s and shard %s', + full_file_name, destination, repr(shard)) + + yield FileResult(full_file_name, + shard_index=shard, + total_shards=self.shards, + window=w, + pane=pane, + destination=destination) + + +class _AppendShardedDestination(beam.DoFn): + + def __init__(self, destination, shards): + self.destination_fn = destination + self.shards = shards + + # We start the shards for a single destination at an arbitrary point. + self._shard_counter = collections.defaultdict( + lambda: random.randrange(self.shards)) + + def _next_shard_for_destination(self, destination): + self._shard_counter[destination] = ( + (self._shard_counter[destination] + 1) % self.shards) + + return self._shard_counter[destination] + + def process(self, record): + destination = self.destination_fn(record) + shard = self._next_shard_for_destination(destination) + + yield ((destination, shard), record) + + +class _WriteUnshardedRecordsFn(beam.DoFn): + + SPILLED_RECORDS = 'spilled_records' + WRITTEN_FILES = 'written_files' + + def __init__(self, + base_path, + destination_fn, + sink_fn, + max_writers_per_bundle=WriteToFiles.MAX_NUM_WRITERS_PER_BUNDLE): + self.base_path = base_path + self.destination_fn = destination_fn + self.sink_fn = sink_fn + self.max_num_writers_per_bundle = max_writers_per_bundle + + def start_bundle(self): + self._writers_and_sinks = {} + self._file_names = {} + + def process(self, + record, + w=beam.DoFn.WindowParam, + pane=beam.DoFn.PaneInfoParam): + destination = self.destination_fn(record) + + writer, sink = self._get_or_create_writer_and_sink(destination, w) + + if not writer: + return [beam.pvalue.TaggedOutput(self.SPILLED_RECORDS, record)] + else: + sink.write(record) + + def _get_or_create_writer_and_sink(self, destination, window): + """Returns a tuple of writer, sink.""" + writer_key = (destination, window) + + if writer_key in self._writers_and_sinks: + return self._writers_and_sinks.get(writer_key) + elif len(self._writers_and_sinks) >= self.max_num_writers_per_bundle: + # The writer does not exist, and we have too many writers already. + return None, None + else: + # The writer does not exist, but we can still create a new one. + full_file_name, writer = _create_writer(base_path=self.base_path.get(), + writer_key=writer_key) + sink = self.sink_fn(destination) + + sink.open(writer) + + self._writers_and_sinks[writer_key] = (writer, sink) + self._file_names[writer_key] = full_file_name + return self._writers_and_sinks[writer_key] + + def finish_bundle(self): + for key, (writer, sink) in self._writers_and_sinks.items(): + + sink.flush() + writer.close() + + file_result = FileResult(self._file_names[key], + shard_index=-1, + total_shards=0, + window=key[1], + pane=None, # TODO(pabloem): get the pane info + destination=key[0]) + + yield beam.pvalue.TaggedOutput( + self.WRITTEN_FILES, + beam.transforms.window.WindowedValue( + file_result, + timestamp=key[1].start, + windows=[key[1]] # TODO(pabloem) HOW DO WE GET THE PANE + )) diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 096149b..c533ef8 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -21,20 +21,38 @@ from __future__ import absolute_import import csv import io +import json import logging import os import sys import unittest +import uuid +from hamcrest.library.text import stringmatches from nose.plugins.attrib import attr import apache_beam as beam from apache_beam.io import fileio from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_stream import TestStream from apache_beam.testing.test_utils import compute_hash from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.testing.util import matches_all +from apache_beam.transforms import trigger +from apache_beam.transforms.window import FixedWindows +from apache_beam.transforms.window import GlobalWindow + + +def _get_file_reader(readable_file): + if sys.version_info >= (3, 0): + return io.TextIOWrapper(readable_file.open()) + else: + return readable_file.open() class MatchTest(_TestCaseWithTempDirCleanUp): @@ -48,7 +66,9 @@ class MatchTest(_TestCaseWithTempDirCleanUp): files.append(self._create_temp_file(dir=tempdir)) with TestPipeline() as p: - files_pc = p | fileio.MatchFiles(tempdir) | beam.Map(lambda x: x.path) + files_pc = (p + | fileio.MatchFiles(FileSystems.join(tempdir, '*')) + | beam.Map(lambda x: x.path)) assert_that(files_pc, equal_to(files)) @@ -66,7 +86,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp): with TestPipeline() as p: files_pc = (p - | beam.Create(directories) + | beam.Create([FileSystems.join(d, '*') for d in directories]) | fileio.MatchAll() | beam.Map(lambda x: x.path)) @@ -85,7 +105,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp): with TestPipeline() as p: files_pc = ( p - | beam.Create(directories) + | beam.Create([FileSystems.join(d, '*') for d in directories]) | fileio.MatchAll(fileio.EmptyMatchTreatment.DISALLOW) | beam.Map(lambda x: x.path)) @@ -103,7 +123,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp): with TestPipeline() as p: files_pc = ( p - | beam.Create(['%s*' % d for d in directories]) + | beam.Create([FileSystems.join(d, '*') for d in directories]) | fileio.MatchAll(fileio.EmptyMatchTreatment.ALLOW_IF_WILDCARD) | beam.Map(lambda x: x.path)) @@ -119,7 +139,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp): with TestPipeline() as p: content_pc = (p - | beam.Create([dir]) + | beam.Create([FileSystems.join(dir, '*')]) | fileio.MatchAll() | fileio.ReadMatches() | beam.FlatMap( @@ -134,18 +154,12 @@ class ReadTest(_TestCaseWithTempDirCleanUp): dir = '%s%s' % (self._new_tempdir(), os.sep) self._create_temp_file(dir=dir, content=content) - def get_csv_reader(readable_file): - if sys.version_info >= (3, 0): - return csv.reader(io.TextIOWrapper(readable_file.open())) - else: - return csv.reader(readable_file.open()) - with TestPipeline() as p: content_pc = (p - | beam.Create([dir]) + | beam.Create([FileSystems.join(dir, '*')]) | fileio.MatchAll() | fileio.ReadMatches() - | beam.FlatMap(get_csv_reader)) + | beam.FlatMap(lambda rf: csv.reader(_get_file_reader(rf)))) assert_that(content_pc, equal_to(rows)) @@ -160,7 +174,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp): with TestPipeline() as p: contents_pc = (p - | beam.Create(files + [tempdir]) + | beam.Create(files + ['%s/' % tempdir]) | fileio.ReadMatches() | beam.FlatMap( lambda x: x.read().decode('utf-8').splitlines())) @@ -179,7 +193,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp): with self.assertRaises(beam.io.filesystem.BeamIOError): with TestPipeline() as p: _ = (p - | beam.Create(files + [tempdir]) + | beam.Create(files + ['%s/' % tempdir]) | fileio.ReadMatches(skip_directories=False) | beam.Map(lambda x: x.read_utf8())) @@ -233,6 +247,336 @@ class MatchIntegrationTest(unittest.TestCase): label='Assert Checksums') +class WriteFilesTest(_TestCaseWithTempDirCleanUp): + + SIMPLE_COLLECTION = [ + {'project': 'beam', 'foundation': 'apache'}, + {'project': 'prometheus', 'foundation': 'cncf'}, + {'project': 'flink', 'foundation': 'apache'}, + {'project': 'grpc', 'foundation': 'cncf'}, + {'project': 'spark', 'foundation': 'apache'}, + {'project': 'kubernetes', 'foundation': 'cncf'}, + {'project': 'spark', 'foundation': 'apache'}, + {'project': 'knative', 'foundation': 'cncf'}, + {'project': 'linux', 'foundation': 'linux'}, + ] + + LARGER_COLLECTION = ['{:05d}'.format(i) for i in range(200)] + + CSV_HEADERS = ['project', 'foundation'] + + SIMPLE_COLLECTION_VALIDATION_SET = set([ + (elm['project'], elm['foundation']) for elm in SIMPLE_COLLECTION]) + + class CsvSink(fileio.TextSink): + def __init__(self, headers): + self.headers = headers + + def write(self, record): + self._fh.write(','.join([record[h] for h in self.headers]).encode('utf8')) + self._fh.write('\n'.encode('utf8')) + + class JsonSink(fileio.TextSink): + + def write(self, record): + self._fh.write(json.dumps(record).encode('utf8')) + self._fh.write('\n'.encode('utf8')) + + def test_write_to_single_file_batch(self): + + dir = self._new_tempdir() + + with TestPipeline() as p: + _ = (p + | beam.Create(WriteFilesTest.SIMPLE_COLLECTION) + | "Serialize" >> beam.Map(json.dumps) + | beam.io.fileio.WriteToFiles(path=dir)) + + with TestPipeline() as p: + result = (p + | fileio.MatchFiles(FileSystems.join(dir, '*')) + | fileio.ReadMatches() + | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n'))) + + assert_that(result, + equal_to([json.dumps(row) for row in self.SIMPLE_COLLECTION])) + + def test_write_to_different_file_types_some_spilling(self): + + dir = self._new_tempdir() + + with TestPipeline() as p: + _ = (p + | beam.Create(WriteFilesTest.SIMPLE_COLLECTION) + | beam.io.fileio.WriteToFiles( + path=dir, + destination=lambda record: record['foundation'], + sink=lambda dest: ( + WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS) + if dest == 'apache' else WriteFilesTest.JsonSink()), + file_naming=fileio.destination_prefix_naming(), + max_writers_per_bundle=1)) + + with TestPipeline() as p: + cncf_res = (p + | fileio.MatchFiles(FileSystems.join(dir, 'cncf*')) + | fileio.ReadMatches() + | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n'))) + + apache_res = (p + | "MatchApache" >> fileio.MatchFiles( + FileSystems.join(dir, 'apache*')) + | "ReadApache" >> fileio.ReadMatches() + | "MapApache" >> beam.FlatMap( + lambda rf: csv.reader(_get_file_reader(rf)))) + + assert_that(cncf_res, + equal_to([json.dumps(row) + for row in self.SIMPLE_COLLECTION + if row['foundation'] == 'cncf']), + label='verifyCNCF') + + assert_that(apache_res, + equal_to([[row['project'], row['foundation']] + for row in self.SIMPLE_COLLECTION + if row['foundation'] == 'apache']), + label='verifyApache') + + def test_find_orphaned_files(self): + dir = self._new_tempdir() + + write_transform = beam.io.fileio.WriteToFiles(path=dir) + + def write_orphaned_file(temp_dir, writer_key): + temp_dir_path = FileSystems.join(dir, temp_dir) + + file_prefix_dir = FileSystems.join( + temp_dir_path, + str(abs(hash(writer_key)))) + + file_name = '%s_%s' % (file_prefix_dir, uuid.uuid4()) + with FileSystems.create(file_name) as f: + f.write(b'Hello y\'all') + + return file_name + + with TestPipeline() as p: + _ = (p + | beam.Create(WriteFilesTest.SIMPLE_COLLECTION) + | "Serialize" >> beam.Map(json.dumps) + | write_transform) + + # Pre-create the temp directory. + temp_dir_path = FileSystems.mkdirs(FileSystems.join( + dir, write_transform._temp_directory.get())) + write_orphaned_file(write_transform._temp_directory.get(), + (None, GlobalWindow())) + f2 = write_orphaned_file(write_transform._temp_directory.get(), + ('other-dest', GlobalWindow())) + + temp_dir_path = FileSystems.join(dir, write_transform._temp_directory.get()) + leftovers = FileSystems.match(['%s%s*' % (temp_dir_path, os.sep)]) + found_files = [m.path for m in leftovers[0].metadata_list] + self.assertListEqual(found_files, [f2]) + + def test_write_to_different_file_types(self): + + dir = self._new_tempdir() + + with TestPipeline() as p: + _ = (p + | beam.Create(WriteFilesTest.SIMPLE_COLLECTION) + | beam.io.fileio.WriteToFiles( + path=dir, + destination=lambda record: record['foundation'], + sink=lambda dest: ( + WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS) + if dest == 'apache' else WriteFilesTest.JsonSink()), + file_naming=fileio.destination_prefix_naming())) + + with TestPipeline() as p: + cncf_res = (p + | fileio.MatchFiles(FileSystems.join(dir, 'cncf*')) + | fileio.ReadMatches() + | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n'))) + + apache_res = (p + | "MatchApache" >> fileio.MatchFiles( + FileSystems.join(dir, 'apache*')) + | "ReadApache" >> fileio.ReadMatches() + | "MapApache" >> beam.FlatMap( + lambda rf: csv.reader(_get_file_reader(rf)))) + + assert_that(cncf_res, + equal_to([json.dumps(row) + for row in self.SIMPLE_COLLECTION + if row['foundation'] == 'cncf']), + label='verifyCNCF') + + assert_that(apache_res, + equal_to([[row['project'], row['foundation']] + for row in self.SIMPLE_COLLECTION + if row['foundation'] == 'apache']), + label='verifyApache') + + def record_dofn(self): + class RecordDoFn(beam.DoFn): + def process(self, element): + WriteFilesTest.all_records.append(element) + + return RecordDoFn() + + def test_streaming_complex_timing(self): + # Use state on the TestCase class, since other references would be pickled + # into a closure and not have the desired side effects. + # + # TODO(BEAM-5295): Use assert_that after it works for the cases here in + # streaming mode. + WriteFilesTest.all_records = [] + + dir = self._new_tempdir() + + # Setting up the input (TestStream) + ts = TestStream().advance_watermark_to(0) + for elm in WriteFilesTest.LARGER_COLLECTION: + timestamp = int(elm) + + ts.add_elements([('key', '%s' % elm)]) + if timestamp % 5 == 0 and timestamp != 0: + # TODO(BEAM-3759): Add many firings per window after getting PaneInfo. + ts.advance_processing_time(5) + ts.advance_watermark_to(timestamp) + + # The pipeline that we are testing + options = PipelineOptions() + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + res = (p + | ts + | beam.WindowInto( + FixedWindows(10), + trigger=trigger.AfterWatermark(), + accumulation_mode=trigger.AccumulationMode.DISCARDING) + | beam.GroupByKey() + | beam.FlatMap(lambda x: x[1])) + # Triggering after 5 processing-time seconds, and on the watermark. Also + # discarding old elements. + + _ = (res + | beam.io.fileio.WriteToFiles(path=dir, + max_writers_per_bundle=0) + | beam.Map(lambda fr: FileSystems.join(dir, fr.file_name)) + | beam.ParDo(self.record_dofn())) + + # Verification pipeline + with TestPipeline() as p: + files = (p | beam.io.fileio.MatchFiles(FileSystems.join(dir, '*'))) + + file_names = (files | beam.Map(lambda fm: fm.path)) + + file_contents = ( + files + | beam.io.fileio.ReadMatches() + | beam.Map(lambda rf: (rf.metadata.path, + rf.read_utf8().strip().split('\n')))) + + content = (file_contents + | beam.FlatMap(lambda fc: [ln.strip() for ln in fc[1]])) + + assert_that(file_names, equal_to(WriteFilesTest.all_records), + label='AssertFilesMatch') + assert_that(content, matches_all(WriteFilesTest.LARGER_COLLECTION), + label='AssertContentsMatch') + + def test_streaming_different_file_types(self): + dir = self._new_tempdir() + input = iter(WriteFilesTest.SIMPLE_COLLECTION) + ts = (TestStream() + .advance_watermark_to(0) + .add_elements([next(input), next(input)]) + .advance_watermark_to(10) + .add_elements([next(input), next(input)]) + .advance_watermark_to(20) + .add_elements([next(input), next(input)]) + .advance_watermark_to(30) + .add_elements([next(input), next(input)]) + .advance_watermark_to(40)) + + with TestPipeline() as p: + _ = (p + | ts + | beam.WindowInto(FixedWindows(10)) + | beam.io.fileio.WriteToFiles( + path=dir, + destination=lambda record: record['foundation'], + sink=lambda dest: ( + WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS) + if dest == 'apache' else WriteFilesTest.JsonSink()), + file_naming=fileio.destination_prefix_naming(), + max_writers_per_bundle=0, + )) + + with TestPipeline() as p: + cncf_files = (p + | fileio.MatchFiles(FileSystems.join(dir, 'cncf*')) + | "CncfFileNames" >> beam.Map(lambda fm: fm.path)) + + apache_files = (p + | "MatchApache" >> fileio.MatchFiles( + FileSystems.join(dir, 'apache*')) + | "ApacheFileNames" >> beam.Map(lambda fm: fm.path)) + + assert_that(cncf_files, + matches_all([ + stringmatches.matches_regexp( + FileSystems.join( + dir, + 'cncf-1970-01-01T00:00:00-1970-01-01T00:00:10--.*' + ) + ), + stringmatches.matches_regexp( + FileSystems.join( + dir, + 'cncf-1970-01-01T00:00:10-1970-01-01T00:00:20--.*' + ) + ), + stringmatches.matches_regexp( + FileSystems.join( + dir, + 'cncf-1970-01-01T00:00:20-1970-01-01T00:00:30--.*' + ) + ), + stringmatches.matches_regexp( + FileSystems.join( + dir, + 'cncf-1970-01-01T00:00:30-1970-01-01T00:00:40--.*' + ) + ) + ]), + label='verifyCNCFFiles') + + assert_that(apache_files, + matches_all([ + stringmatches.matches_regexp(FileSystems.join( + dir, + 'apache-1970-01-01T00:00:00-1970-01-01T00:00:10--.*') + ), + stringmatches.matches_regexp(FileSystems.join( + dir, + 'apache-1970-01-01T00:00:10-1970-01-01T00:00:20--.*') + ), + stringmatches.matches_regexp(FileSystems.join( + dir, + 'apache-1970-01-01T00:00:20-1970-01-01T00:00:30--.*') + ), + stringmatches.matches_regexp(FileSystems.join( + dir, + 'apache-1970-01-01T00:00:30-1970-01-01T00:00:40--.*') + ) + ]), + label='verifyApacheFiles') + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index cfdf472..efc745a 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -400,8 +400,7 @@ class CompressedFile(object): class FileMetadata(object): - """Metadata about a file path that is the output of FileSystem.match - """ + """Metadata about a file path that is the output of FileSystem.match.""" def __init__(self, path, size_in_bytes): assert isinstance(path, (str, unicode)) and path, "Path should be a string" assert isinstance(size_in_bytes, (int, long)) and size_in_bytes >= 0, \ @@ -430,7 +429,7 @@ class FileMetadata(object): class MatchResult(object): """Result from the ``FileSystem`` match operation which contains the list - of matched FileMetadata. + of matched ``FileMetadata``. """ def __init__(self, pattern, metadata_list): self.metadata_list = metadata_list diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 3099b0f..2e38d37 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -25,6 +25,9 @@ import io import tempfile from builtins import object +from hamcrest.core import assert_that as hamcrest_assert +from hamcrest.library.collection import contains_inanyorder + from apache_beam import pvalue from apache_beam.transforms import window from apache_beam.transforms.core import Create @@ -41,6 +44,7 @@ __all__ = [ 'equal_to', 'is_empty', 'is_not_empty', + 'matches_all', # open_shards is internal and has no backwards compatibility guarantees. 'open_shards', 'TestWindowedValue', @@ -142,6 +146,21 @@ def equal_to(expected): return _equal +def matches_all(expected): + """Matcher used by assert_that to check a set of matchers. + + Args: + expected: A list of elements or hamcrest matchers to be used to match + the elements of a single PCollection. + """ + def _matches(actual): + expected_list = list(expected) + + hamcrest_assert(actual, contains_inanyorder(*expected_list)) + + return _matches + + def is_empty(): def _empty(actual): actual = list(actual) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index ca1fb46..ead094b 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -420,11 +420,12 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn): SideInputParam = _DoFnParam('SideInputParam') TimestampParam = _DoFnParam('TimestampParam') WindowParam = _DoFnParam('WindowParam') + PaneInfoParam = _DoFnParam('PaneInfoParam') WatermarkReporterParam = _DoFnParam('WatermarkReporterParam') BundleFinalizerParam = _BundleFinalizerParam DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam, - WindowParam, WatermarkReporterParam, + WindowParam, WatermarkReporterParam, PaneInfoParam, BundleFinalizerParam] # Parameters to access state and timers. Not restricted to use only in the @@ -2065,11 +2066,15 @@ class WindowInto(ParDo): new_windows = self.windowing.windowfn.assign(context) yield WindowedValue(element, context.timestamp, new_windows) - def __init__(self, windowfn, **kwargs): + def __init__(self, + windowfn, + trigger=None, + accumulation_mode=None, + timestamp_combiner=None): """Initializes a WindowInto transform. Args: - windowfn: Function to be used for windowing + windowfn (Windowing, WindowFn): Function to be used for windowing. trigger: (optional) Trigger used for windowing, or None for default. accumulation_mode: (optional) Accumulation mode used for windowing, required for non-trivial triggers. @@ -2080,19 +2085,14 @@ class WindowInto(ParDo): # Overlay windowing with kwargs. windowing = windowfn windowfn = windowing.windowfn - # Use windowing to fill in defaults for kwargs. - kwargs = dict(dict( - trigger=windowing.triggerfn, - accumulation_mode=windowing.accumulation_mode, - timestamp_combiner=windowing.timestamp_combiner), **kwargs) - # Use kwargs to simulate keyword-only arguments. - triggerfn = kwargs.pop('trigger', None) - accumulation_mode = kwargs.pop('accumulation_mode', None) - timestamp_combiner = kwargs.pop('timestamp_combiner', None) - if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) + + # Use windowing to fill in defaults for the extra arguments. + trigger = trigger or windowing.triggerfn + accumulation_mode = accumulation_mode or windowing.accumulation_mode + timestamp_combiner = timestamp_combiner or windowing.timestamp_combiner + self.windowing = Windowing( - windowfn, triggerfn, accumulation_mode, timestamp_combiner) + windowfn, trigger, accumulation_mode, timestamp_combiner) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) def get_windowing(self, unused_inputs): diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index b0d9a25..ddf4c24 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -297,6 +297,7 @@ class AfterProcessingTime(TriggerFn): """ def __init__(self, delay=0): + """Initialize a processing time trigger with a delay in seconds.""" self.delay = delay def __repr__(self): diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 1990532..e477303 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -361,12 +361,22 @@ class FixedWindows(NonMergingWindowFn): Attributes: size: Size of the window as seconds. - offset: Offset of this window as seconds since Unix epoch. Windows start at - t=N * size + offset where t=0 is the epoch. The offset must be a value - in range [0, size). If it is not it will be normalized to this range. + offset: Offset of this window as seconds. Windows start at + t=N * size + offset where t=0 is the UNIX epoch. The offset must be a + value in range [0, size). If it is not it will be normalized to this + range. """ def __init__(self, size, offset=0): + """Initialize a ``FixedWindows`` function for a given size and offset. + + Args: + size (int): Size of the window in seconds. + offset(int): Offset of this window as seconds. Windows start at + t=N * size + offset where t=0 is the UNIX epoch. The offset must be a + value in range [0, size). If it is not it will be normalized to this + range. + """ if size <= 0: raise ValueError('The size parameter must be strictly positive.') self.size = Duration.of(size) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index a0b6622..5570c45 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -46,7 +46,13 @@ class PaneInfoTiming(object): class PaneInfo(object): - """Describes the trigger firing information for a given WindowedValue.""" + """Describes the trigger firing information for a given WindowedValue. + + "Panes" represent individual firings on a single window. ``PaneInfo``s are + passed downstream after trigger firings. They contain information about + whether it's an early/on time/late firing, if it's the last or first firing + from a window, and the index of the firing. + """ def __init__(self, is_first, is_last, timing, index, nonspeculative_index): self._is_first = is_first