.On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote:
> Just FYI - the reason why names (including prefixes) in > DynamicDestinations were parameterized via a lambda instead of just having > the user add it via MapElements is performance. We discussed something > along the lines of what you are suggesting (essentially having the user > create a KV where the key contained the dynamic information). The problem > was that often the size of the generated filepath was often much larger > (sometimes by 2 OOM) than the information in the record, and there was a > desire to avoid record blowup. e.g. the record might contain a single > integer userid, and the filepath prefix would then be > /long/path/to/output/users/<id>. This was especially bad in cases where the > data had to be shuffled, and the existing dynamic destinations method > allowed extracting the filepath only _after_ the shuffle. > That is a consideration I hadn't thought much of, thanks for bringing this up. > Now there may not be any good way to keep this benefit in a > declarative approach such as YAML (or at least a good easy way - we could > always allow the user to pass in a SQL expression to extract the filename > from the record!), but we should keep in mind that this might mean that > YAML-generated pipelines will be less efficient for certain use cases. > Yep, it's not as straightforward to do in a declarative way. I would like to avoid mixing UDFs (with their associated languages and execution environments) if possible. Though I'd like the performance of a "straightforward" YAML pipeline to be that which one can get writing straight-line Java (and possibly better, if we can leverage the structure of schemas everywhere) this is not an absolute requirement for all features. I wonder if separating out a constant prefix vs. the dynamic stuff could be sufficient to mitigate the blow-up of pre-computing this in most cases (especially in the context of a larger pipeline). Alternatively, rather than just a sharding pattern, one could have a full filepattern that includes format parameters for dynamically computed bits as well as the shard number, windowing info, etc. (There are pros and cons to this.) > On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> Currently the various file writing configurations take a single >> parameter, path, which indicates where the (sharded) output should be >> placed. In other words, one can write something like >> >> pipeline: >> ... >> sink: >> type: WriteToParquet >> config: >> path: /beam/filesytem/dest >> >> and one gets files like "/beam/filesystem/dest-X-of-N" >> >> Of course, in practice file writing is often much more complicated than >> this (especially when it comes to Streaming). For reference, I've included >> links to our existing offerings in the various SDKs below. I'd like to >> start a discussion about what else should go in the "config" parameter and >> how it should be expressed in YAML. >> >> The primary concern is around naming. This can generally be split into >> (1) the prefix, which must be provided by the users (2) the sharing >> information, includes both shard counts (e.g. (the -X-of-N suffix) but also >> windowing information (for streaming pipelines) which we may want to allow >> the user to customize the formatting of, and (3) a suffix like .json or >> .avro that is useful for both humans and tooling and can often be inferred >> but should allow customization as well. >> >> An interesting case is that of dynamic destinations, where the prefix (or >> other parameters) may themselves be functions of the records themselves. (I >> am excluding the case where the format itself is variable--such cases are >> probably better handled by explicitly partitioning the data and doing >> multiple writes, as this introduces significant complexities and the set of >> possible formats is generally finite and known ahead of time.) I propose >> that we leverage the fact that we have structured data to be able to pull >> out these dynamic parameters. For example, if we have an input data set >> with a string column my_col we could allow something like >> >> config: >> path: {dynamic: my_col} >> >> which would pull this information out at runtime. (With the MapToFields >> transform, it is very easy to compute/append additional fields to existing >> records.) Generally this field would then be stripped from the written >> data, which would only see the subset of non-dynamically referenced columns >> (though this could be configurable: we could add an attribute like >> {dynamic: my_col, Keep: true} or require the set of columns to be actually >> written (or elided) to be enumerated in the config or allow/require the >> actual data to be written to be in a designated field of the "full" input >> records as arranged by a preceding transform). It'd be great to get >> input/impressions from a wide range of people here on what would be the >> most natural. Often just writing out snippets of various alternatives can >> be quite informative (though I'm avoiding putting them here for the moment >> to avoid biasing ideas right off the bat). >> >> For streaming pipelines it is often essential to write data out in a >> time-partitioned manner. The typical way to do this is to add the windowing >> information into the shard specification itself, and a (set of) file(s) is >> written on each window closing. Beam YAML already supports any transform >> being given a "windowing" configuration which will cause a WindowInto >> transform to be applied to its input(s) before application which can sit >> naturally on a sink. We may want to consider if non-windowed writes make >> sense as well (though how this interacts with the watermark and underlying >> implementations are a large open question, so this is a larger change that >> might make sense to defer). >> >> Note that I am explicitly excluding "coders" here. All data in YAML >> should be schema'd, and writers should know how to write this structured >> data. We may want to allow a "schema" field to allow a user to specify the >> desired schema in a manner compatible with the sink format itself (e.g. >> avro, json, whatever) that could be used both for validation and possibly >> resolving ambiguities (e.g. if the sink has an enum format that is not >> expressed in the schema of the input PCollection). >> >> Some other configuration options are that some formats (especially >> text-based ones) allow for specification of an external compression type >> (which may be inferable from the suffix), whether to write a single shard >> if the input collection is empty or no shards at all (an occasional user >> request that's supported for some Beam sinks now), whether to allow fixed >> sharing (generally discouraged, as it disables things like automatic >> shading based on input size, let alone dynamic work rebalancing, though >> sometimes this is useful if the input is known to be small and a single >> output is desired regardless of the restriction in parallelism), or other >> sharding parameters (e.g. limiting the number of total elements or >> (approximately) total number of bytes per output shard). Some of these >> options may not be available/implemented for all formats--consideration >> should be given as to how to handle this inconsistency (runtime errors for >> unsupported combinations or simply not allowing them on any until all are >> supported). >> >> A final consideration: we do not anticipate exposing the full complexity >> of Beam in the YAML offering. For advanced users using a "real" SDK will >> often be preferable, and we intend to provide a migration path from YAML to >> a language of your choice (codegen) as a migration path. So we should >> balance simplicity with completeness and utility here. >> >> Sure, we could just pick something, but given that the main point of YAML >> is not capability, but expressibility and ease-of-use, I think it's worth >> trying to get the expression of these concepts right. I'm sure many of you >> have written a pipeline to files at some point in time; I'd welcome any >> thoughts anyone has on the matter. >> >> - Robert >> >> >> P.S. A related consideration: how should we consider the plain Read >> (where that file pattern is given at pipeline construction) from the >> ReadAll variants? Should they be separate transforms, or should we instead >> allow the same named transform (e.g. ReadFromParquet) support both modes, >> depending on whether an input PCollection or explicit file path is given >> (the two being mutually exclusive, with exactly one required, and good >> error messaging of course)? >> >> >> Java: >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >> Python: >> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >> Go: >> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >> Typescript: >> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >> Scio: >> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >> >>