Currently the various file writing configurations take a single parameter,
path, which indicates where the (sharded) output should be placed. In other
words, one can write something like

  pipeline:
    ...
    sink:
      type: WriteToParquet
      config:
        path: /beam/filesytem/dest

and one gets files like "/beam/filesystem/dest-X-of-N"

Of course, in practice file writing is often much more complicated than
this (especially when it comes to Streaming). For reference, I've included
links to our existing offerings in the various SDKs below. I'd like to
start a discussion about what else should go in the "config" parameter and
how it should be expressed in YAML.

The primary concern is around naming. This can generally be split into (1)
the prefix, which must be provided by the users (2) the sharing
information, includes both shard counts (e.g. (the -X-of-N suffix) but also
windowing information (for streaming pipelines) which we may want to allow
the user to customize the formatting of, and (3) a suffix like .json or
.avro that is useful for both humans and tooling and can often be inferred
but should allow customization as well.

An interesting case is that of dynamic destinations, where the prefix (or
other parameters) may themselves be functions of the records themselves. (I
am excluding the case where the format itself is variable--such cases are
probably better handled by explicitly partitioning the data and doing
multiple writes, as this introduces significant complexities and the set of
possible formats is generally finite and known ahead of time.) I propose
that we leverage the fact that we have structured data to be able to pull
out these dynamic parameters. For example, if we have an input data set
with a string column my_col we could allow something like

  config:
    path: {dynamic: my_col}

which would pull this information out at runtime. (With the MapToFields
transform, it is very easy to compute/append additional fields to existing
records.) Generally this field would then be stripped from the written
data, which would only see the subset of non-dynamically referenced columns
(though this could be configurable: we could add an attribute like
{dynamic: my_col, Keep: true} or require the set of columns to be actually
written (or elided) to be enumerated in the config or allow/require the
actual data to be written to be in a designated field of the "full" input
records as arranged by a preceding transform). It'd be great to get
input/impressions from a wide range of people here on what would be the
most natural. Often just writing out snippets of various alternatives can
be quite informative (though I'm avoiding putting them here for the moment
to avoid biasing ideas right off the bat).

For streaming pipelines it is often essential to write data out in a
time-partitioned manner. The typical way to do this is to add the windowing
information into the shard specification itself, and a (set of) file(s) is
written on each window closing. Beam YAML already supports any transform
being given a "windowing" configuration which will cause a WindowInto
transform to be applied to its input(s) before application which can sit
naturally on a sink. We may want to consider if non-windowed writes make
sense as well (though how this interacts with the watermark and underlying
implementations are a large open question, so this is a larger change that
might make sense to defer).

Note that I am explicitly excluding "coders" here. All data in YAML should
be schema'd, and writers should know how to write this structured data. We
may want to allow a "schema" field to allow a user to specify the desired
schema in a manner compatible with the sink format itself (e.g. avro, json,
whatever) that could be used both for validation and possibly resolving
ambiguities (e.g. if the sink has an enum format that is not expressed in
the schema of the input PCollection).

Some other configuration options are that some formats (especially
text-based ones) allow for specification of an external compression type
(which may be inferable from the suffix), whether to write a single shard
if the input collection is empty or no shards at all (an occasional user
request that's supported for some Beam sinks now), whether to allow fixed
sharing (generally discouraged, as it disables things like automatic
shading based on input size, let alone dynamic work rebalancing, though
sometimes this is useful if the input is known to be small and a single
output is desired regardless of the restriction in parallelism), or other
sharding parameters (e.g. limiting the number of total elements or
(approximately) total number of bytes per output shard). Some of these
options may not be available/implemented for all formats--consideration
should be given as to how to handle this inconsistency (runtime errors for
unsupported combinations or simply not allowing them on any until all are
supported).

A final consideration: we do not anticipate exposing the full complexity of
Beam in the YAML offering. For advanced users using a "real" SDK will often
be preferable, and we intend to provide a migration path from YAML to a
language of your choice (codegen) as a migration path. So we should balance
simplicity with completeness and utility here.

Sure, we could just pick something, but given that the main point of YAML
is not capability, but expressibility and ease-of-use, I think it's worth
trying to get the expression of these concepts right. I'm sure many of you
have written a pipeline to files at some point in time; I'd welcome any
thoughts anyone has on the matter.

- Robert


P.S. A related consideration: how should we consider the plain Read (where
that file pattern is given at pipeline construction) from the ReadAll
variants? Should they be separate transforms, or should we instead allow
the same named transform (e.g. ReadFromParquet) support both modes,
depending on whether an input PCollection or explicit file path is given
(the two being mutually exclusive, with exactly one required, and good
error messaging of course)?


Java:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
Python:
https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
Go:
https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
Typescript:
https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
Scio:
https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html

Reply via email to