Currently the various file writing configurations take a single parameter, path, which indicates where the (sharded) output should be placed. In other words, one can write something like
pipeline: ... sink: type: WriteToParquet config: path: /beam/filesytem/dest and one gets files like "/beam/filesystem/dest-X-of-N" Of course, in practice file writing is often much more complicated than this (especially when it comes to Streaming). For reference, I've included links to our existing offerings in the various SDKs below. I'd like to start a discussion about what else should go in the "config" parameter and how it should be expressed in YAML. The primary concern is around naming. This can generally be split into (1) the prefix, which must be provided by the users (2) the sharing information, includes both shard counts (e.g. (the -X-of-N suffix) but also windowing information (for streaming pipelines) which we may want to allow the user to customize the formatting of, and (3) a suffix like .json or .avro that is useful for both humans and tooling and can often be inferred but should allow customization as well. An interesting case is that of dynamic destinations, where the prefix (or other parameters) may themselves be functions of the records themselves. (I am excluding the case where the format itself is variable--such cases are probably better handled by explicitly partitioning the data and doing multiple writes, as this introduces significant complexities and the set of possible formats is generally finite and known ahead of time.) I propose that we leverage the fact that we have structured data to be able to pull out these dynamic parameters. For example, if we have an input data set with a string column my_col we could allow something like config: path: {dynamic: my_col} which would pull this information out at runtime. (With the MapToFields transform, it is very easy to compute/append additional fields to existing records.) Generally this field would then be stripped from the written data, which would only see the subset of non-dynamically referenced columns (though this could be configurable: we could add an attribute like {dynamic: my_col, Keep: true} or require the set of columns to be actually written (or elided) to be enumerated in the config or allow/require the actual data to be written to be in a designated field of the "full" input records as arranged by a preceding transform). It'd be great to get input/impressions from a wide range of people here on what would be the most natural. Often just writing out snippets of various alternatives can be quite informative (though I'm avoiding putting them here for the moment to avoid biasing ideas right off the bat). For streaming pipelines it is often essential to write data out in a time-partitioned manner. The typical way to do this is to add the windowing information into the shard specification itself, and a (set of) file(s) is written on each window closing. Beam YAML already supports any transform being given a "windowing" configuration which will cause a WindowInto transform to be applied to its input(s) before application which can sit naturally on a sink. We may want to consider if non-windowed writes make sense as well (though how this interacts with the watermark and underlying implementations are a large open question, so this is a larger change that might make sense to defer). Note that I am explicitly excluding "coders" here. All data in YAML should be schema'd, and writers should know how to write this structured data. We may want to allow a "schema" field to allow a user to specify the desired schema in a manner compatible with the sink format itself (e.g. avro, json, whatever) that could be used both for validation and possibly resolving ambiguities (e.g. if the sink has an enum format that is not expressed in the schema of the input PCollection). Some other configuration options are that some formats (especially text-based ones) allow for specification of an external compression type (which may be inferable from the suffix), whether to write a single shard if the input collection is empty or no shards at all (an occasional user request that's supported for some Beam sinks now), whether to allow fixed sharing (generally discouraged, as it disables things like automatic shading based on input size, let alone dynamic work rebalancing, though sometimes this is useful if the input is known to be small and a single output is desired regardless of the restriction in parallelism), or other sharding parameters (e.g. limiting the number of total elements or (approximately) total number of bytes per output shard). Some of these options may not be available/implemented for all formats--consideration should be given as to how to handle this inconsistency (runtime errors for unsupported combinations or simply not allowing them on any until all are supported). A final consideration: we do not anticipate exposing the full complexity of Beam in the YAML offering. For advanced users using a "real" SDK will often be preferable, and we intend to provide a migration path from YAML to a language of your choice (codegen) as a migration path. So we should balance simplicity with completeness and utility here. Sure, we could just pick something, but given that the main point of YAML is not capability, but expressibility and ease-of-use, I think it's worth trying to get the expression of these concepts right. I'm sure many of you have written a pipeline to files at some point in time; I'd welcome any thoughts anyone has on the matter. - Robert P.S. A related consideration: how should we consider the plain Read (where that file pattern is given at pipeline construction) from the ReadAll variants? Should they be separate transforms, or should we instead allow the same named transform (e.g. ReadFromParquet) support both modes, depending on whether an input PCollection or explicit file path is given (the two being mutually exclusive, with exactly one required, and good error messaging of course)? Java: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html Python: https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText Go: https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write Typescript: https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html Scio: https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html