I suspect some simple pattern templating would solve most use cases. We probably would want to support timestamp formatting (e.g. $YYYY $M $D) as well.
On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com> wrote: > On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> I would say: >> >> sink: >> type: WriteToParquet >> config: >> path: /beam/filesytem/dest >> prefix: <my prefix> >> suffix: <my suffix> >> >> Underlying SDK will add the middle part of the file names to make sure >> that files generated by various bundles/windows/shards do not conflict. >> > > What's the relationship between path and prefix? Is path the > directory part of the full path, or does prefix precede it? > > >> This will satisfy the vast majority of use-cases I believe. Fully >> customizing the file pattern sounds like a more advanced use case that can >> be left for "real" SDKs. >> > > Yea, we don't have to do everything. > > >> For dynamic destinations, I think just making the "path" component >> support a lambda that is parameterized by the input should be adequate >> since this allows customers to direct files written to different >> destination directories. >> >> sink: >> type: WriteToParquet >> config: >> path: <destination lambda> >> prefix: <my prefix> >> suffix: <my suffix> >> >> I'm not sure what would be the best way to specify a lambda here though. >> Maybe a regex or the name of a Python callable ? >> > > I'd rather not require Python for a pure Java pipeline, but some kind of a > pattern template may be sufficient here. > > >> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >> dev@beam.apache.org> wrote: >> >>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Just FYI - the reason why names (including prefixes) in >>>> DynamicDestinations were parameterized via a lambda instead of just having >>>> the user add it via MapElements is performance. We discussed something >>>> along the lines of what you are suggesting (essentially having the user >>>> create a KV where the key contained the dynamic information). The problem >>>> was that often the size of the generated filepath was often much larger >>>> (sometimes by 2 OOM) than the information in the record, and there was a >>>> desire to avoid record blowup. e.g. the record might contain a single >>>> integer userid, and the filepath prefix would then be >>>> /long/path/to/output/users/<id>. This was especially bad in cases where the >>>> data had to be shuffled, and the existing dynamic destinations method >>>> allowed extracting the filepath only _after_ the shuffle. >>>> >>> >>> That is a consideration I hadn't thought much of, thanks for >>> bringing this up. >>> >>> >>>> Now there may not be any good way to keep this benefit in a >>>> declarative approach such as YAML (or at least a good easy way - we could >>>> always allow the user to pass in a SQL expression to extract the filename >>>> from the record!), but we should keep in mind that this might mean that >>>> YAML-generated pipelines will be less efficient for certain use cases. >>>> >>> >>> Yep, it's not as straightforward to do in a declarative way. I would >>> like to avoid mixing UDFs (with their associated languages and execution >>> environments) if possible. Though I'd like the performance of a >>> "straightforward" YAML pipeline to be that which one can get writing >>> straight-line Java (and possibly better, if we can leverage the structure >>> of schemas everywhere) this is not an absolute requirement for all >>> features. >>> >>> I wonder if separating out a constant prefix vs. the dynamic stuff could >>> be sufficient to mitigate the blow-up of pre-computing this in most cases >>> (especially in the context of a larger pipeline). Alternatively, rather >>> than just a sharding pattern, one could have a full filepattern that >>> includes format parameters for dynamically computed bits as well as the >>> shard number, windowing info, etc. (There are pros and cons to this.) >>> >>> >>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < >>>> dev@beam.apache.org> wrote: >>>> >>>>> Currently the various file writing configurations take a single >>>>> parameter, path, which indicates where the (sharded) output should be >>>>> placed. In other words, one can write something like >>>>> >>>>> pipeline: >>>>> ... >>>>> sink: >>>>> type: WriteToParquet >>>>> config: >>>>> path: /beam/filesytem/dest >>>>> >>>>> and one gets files like "/beam/filesystem/dest-X-of-N" >>>>> >>>>> Of course, in practice file writing is often much more complicated >>>>> than this (especially when it comes to Streaming). For reference, I've >>>>> included links to our existing offerings in the various SDKs below. I'd >>>>> like to start a discussion about what else should go in the "config" >>>>> parameter and how it should be expressed in YAML. >>>>> >>>>> The primary concern is around naming. This can generally be split into >>>>> (1) the prefix, which must be provided by the users (2) the sharing >>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but >>>>> also >>>>> windowing information (for streaming pipelines) which we may want to allow >>>>> the user to customize the formatting of, and (3) a suffix like .json or >>>>> .avro that is useful for both humans and tooling and can often be inferred >>>>> but should allow customization as well. >>>>> >>>>> An interesting case is that of dynamic destinations, where the prefix >>>>> (or other parameters) may themselves be functions of the records >>>>> themselves. (I am excluding the case where the format itself is >>>>> variable--such cases are probably better handled by explicitly >>>>> partitioning >>>>> the data and doing multiple writes, as this introduces significant >>>>> complexities and the set of possible formats is generally finite and known >>>>> ahead of time.) I propose that we leverage the fact that we have >>>>> structured >>>>> data to be able to pull out these dynamic parameters. For example, if we >>>>> have an input data set with a string column my_col we could allow >>>>> something >>>>> like >>>>> >>>>> config: >>>>> path: {dynamic: my_col} >>>>> >>>>> which would pull this information out at runtime. (With the >>>>> MapToFields transform, it is very easy to compute/append additional fields >>>>> to existing records.) Generally this field would then be stripped from the >>>>> written data, which would only see the subset of non-dynamically >>>>> referenced >>>>> columns (though this could be configurable: we could add an attribute like >>>>> {dynamic: my_col, Keep: true} or require the set of columns to be actually >>>>> written (or elided) to be enumerated in the config or allow/require the >>>>> actual data to be written to be in a designated field of the "full" input >>>>> records as arranged by a preceding transform). It'd be great to get >>>>> input/impressions from a wide range of people here on what would be the >>>>> most natural. Often just writing out snippets of various alternatives can >>>>> be quite informative (though I'm avoiding putting them here for the moment >>>>> to avoid biasing ideas right off the bat). >>>>> >>>>> For streaming pipelines it is often essential to write data out in a >>>>> time-partitioned manner. The typical way to do this is to add the >>>>> windowing >>>>> information into the shard specification itself, and a (set of) file(s) is >>>>> written on each window closing. Beam YAML already supports any transform >>>>> being given a "windowing" configuration which will cause a WindowInto >>>>> transform to be applied to its input(s) before application which can sit >>>>> naturally on a sink. We may want to consider if non-windowed writes make >>>>> sense as well (though how this interacts with the watermark and underlying >>>>> implementations are a large open question, so this is a larger change that >>>>> might make sense to defer). >>>>> >>>>> Note that I am explicitly excluding "coders" here. All data in YAML >>>>> should be schema'd, and writers should know how to write this structured >>>>> data. We may want to allow a "schema" field to allow a user to specify the >>>>> desired schema in a manner compatible with the sink format itself (e.g. >>>>> avro, json, whatever) that could be used both for validation and possibly >>>>> resolving ambiguities (e.g. if the sink has an enum format that is not >>>>> expressed in the schema of the input PCollection). >>>>> >>>>> Some other configuration options are that some formats (especially >>>>> text-based ones) allow for specification of an external compression type >>>>> (which may be inferable from the suffix), whether to write a single shard >>>>> if the input collection is empty or no shards at all (an occasional user >>>>> request that's supported for some Beam sinks now), whether to allow fixed >>>>> sharing (generally discouraged, as it disables things like automatic >>>>> shading based on input size, let alone dynamic work rebalancing, though >>>>> sometimes this is useful if the input is known to be small and a single >>>>> output is desired regardless of the restriction in parallelism), or other >>>>> sharding parameters (e.g. limiting the number of total elements or >>>>> (approximately) total number of bytes per output shard). Some of these >>>>> options may not be available/implemented for all formats--consideration >>>>> should be given as to how to handle this inconsistency (runtime errors for >>>>> unsupported combinations or simply not allowing them on any until all are >>>>> supported). >>>>> >>>>> A final consideration: we do not anticipate exposing the full >>>>> complexity of Beam in the YAML offering. For advanced users using a "real" >>>>> SDK will often be preferable, and we intend to provide a migration path >>>>> from YAML to a language of your choice (codegen) as a migration path. So >>>>> we >>>>> should balance simplicity with completeness and utility here. >>>>> >>>>> Sure, we could just pick something, but given that the main point of >>>>> YAML is not capability, but expressibility and ease-of-use, I think it's >>>>> worth trying to get the expression of these concepts right. I'm sure many >>>>> of you have written a pipeline to files at some point in time; I'd welcome >>>>> any thoughts anyone has on the matter. >>>>> >>>>> - Robert >>>>> >>>>> >>>>> P.S. A related consideration: how should we consider the plain Read >>>>> (where that file pattern is given at pipeline construction) from the >>>>> ReadAll variants? Should they be separate transforms, or should we instead >>>>> allow the same named transform (e.g. ReadFromParquet) support both modes, >>>>> depending on whether an input PCollection or explicit file path is given >>>>> (the two being mutually exclusive, with exactly one required, and good >>>>> error messaging of course)? >>>>> >>>>> >>>>> Java: >>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >>>>> Python: >>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >>>>> Go: >>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >>>>> Typescript: >>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >>>>> Scio: >>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >>>>> >>>>>