That's a good point--- in the dbt case they're almost always treating that as a precomputation. I suppose a JinjaTransform isn't totally insane, but not sure I'd want to introduce Yet Another Way Of Writing A Lambda :-)
On Tue, Oct 10, 2023 at 3:22 PM Robert Bradshaw <rober...@google.com> wrote: > On Tue, Oct 10, 2023 at 7:22 AM Byron Ellis via dev <dev@beam.apache.org> > wrote: > >> FWIW dbt (which is also YAML and has this problem for other reasons) does >> something like this. It also chooses to assume that everything is a string >> but allows users to use the Jinja templating language to make those strings >> dynamic where needed. >> > > Are these only for values that are filled in at runtime (i.e. jinja is a > pre-processor used before the yaml file is passed to dbt) or can they be > plugged in (possibly on a per-record basis) from the data itself? (FWIW, I > think we also want to allow some kind of templating like this to allow for > parameterized composite PTransforms to be define in YAML and additionally > we'll need it for YAML-defined templates (not to be confused with the YAML > template which is a single flext template whose single parameter is the > YAML file itself). > > >> Syntactically I think that's a bit nicer to look at than the shell script >> style and saves having to remember the difference between $() and ${} >> > > +1 > > >> On Tue, Oct 10, 2023 at 7:10 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> Since I've been in GHA files lately... >>> >>> I think they have a very useful pattern which we could borrow from or >>> learn from, where setting up the variables happens separately, like >>> https://github.com/apache/beam/blob/57821c191d322f9f21c01a34c55e0c40eda44f1e/.github/workflows/build_release_candidate.yml#L270 >>> >>> If we called the section "vars" and then the config could use the vars >>> in the destination. I'm making this example deliberately a little gross: >>> >>> - vars: >>> - USER_REGION: $.user.metadata.region >>> - USER_GROUP: $.user.groups[0].name >>> - config: >>> - path: >>> gs://output-bucket-${vars.USER_REGION}/files/${vars.USER_GROUP}-${fileio.SHARD_NUM}-${fileio.WINDOW} >>> >>> I think it strikes a good balance between arbitrary lambdas and just a >>> prefix/suffix control, giving a really easy place where we can say "the >>> whole value of this YAML field is a path expression into the structured >>> data" >>> >>> Kenn >>> >>> On Mon, Oct 9, 2023 at 6:09 PM Chamikara Jayalath via dev < >>> dev@beam.apache.org> wrote: >>> >>>> I would say: >>>> >>>> sink: >>>> type: WriteToParquet >>>> config: >>>> path: /beam/filesytem/dest >>>> prefix: <my prefix> >>>> suffix: <my suffix> >>>> >>>> Underlying SDK will add the middle part of the file names to make sure >>>> that files generated by various bundles/windows/shards do not conflict. >>>> >>>> This will satisfy the vast majority of use-cases I believe. Fully >>>> customizing the file pattern sounds like a more advanced use case that can >>>> be left for "real" SDKs. >>>> >>>> For dynamic destinations, I think just making the "path" component >>>> support a lambda that is parameterized by the input should be adequate >>>> since this allows customers to direct files written to different >>>> destination directories. >>>> >>>> sink: >>>> type: WriteToParquet >>>> config: >>>> path: <destination lambda> >>>> prefix: <my prefix> >>>> suffix: <my suffix> >>>> >>>> I'm not sure what would be the best way to specify a lambda here >>>> though. Maybe a regex or the name of a Python callable ? >>>> >>>> Thanks, >>>> Cham >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >>>> dev@beam.apache.org> wrote: >>>> >>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Just FYI - the reason why names (including prefixes) in >>>>>> DynamicDestinations were parameterized via a lambda instead of just >>>>>> having >>>>>> the user add it via MapElements is performance. We discussed something >>>>>> along the lines of what you are suggesting (essentially having the user >>>>>> create a KV where the key contained the dynamic information). The problem >>>>>> was that often the size of the generated filepath was often much larger >>>>>> (sometimes by 2 OOM) than the information in the record, and there was a >>>>>> desire to avoid record blowup. e.g. the record might contain a single >>>>>> integer userid, and the filepath prefix would then be >>>>>> /long/path/to/output/users/<id>. This was especially bad in cases where >>>>>> the >>>>>> data had to be shuffled, and the existing dynamic destinations method >>>>>> allowed extracting the filepath only _after_ the shuffle. >>>>>> >>>>> >>>>> That is a consideration I hadn't thought much of, thanks for >>>>> bringing this up. >>>>> >>>>> >>>>>> Now there may not be any good way to keep this benefit in a >>>>>> declarative approach such as YAML (or at least a good easy way - we could >>>>>> always allow the user to pass in a SQL expression to extract the filename >>>>>> from the record!), but we should keep in mind that this might mean that >>>>>> YAML-generated pipelines will be less efficient for certain use cases. >>>>>> >>>>> >>>>> Yep, it's not as straightforward to do in a declarative way. I would >>>>> like to avoid mixing UDFs (with their associated languages and execution >>>>> environments) if possible. Though I'd like the performance of a >>>>> "straightforward" YAML pipeline to be that which one can get writing >>>>> straight-line Java (and possibly better, if we can leverage the structure >>>>> of schemas everywhere) this is not an absolute requirement for all >>>>> features. >>>>> >>>>> I wonder if separating out a constant prefix vs. the dynamic stuff >>>>> could be sufficient to mitigate the blow-up of pre-computing this in most >>>>> cases (especially in the context of a larger pipeline). Alternatively, >>>>> rather than just a sharding pattern, one could have a full filepattern >>>>> that >>>>> includes format parameters for dynamically computed bits as well as the >>>>> shard number, windowing info, etc. (There are pros and cons to this.) >>>>> >>>>> >>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < >>>>>> dev@beam.apache.org> wrote: >>>>>> >>>>>>> Currently the various file writing configurations take a single >>>>>>> parameter, path, which indicates where the (sharded) output should be >>>>>>> placed. In other words, one can write something like >>>>>>> >>>>>>> pipeline: >>>>>>> ... >>>>>>> sink: >>>>>>> type: WriteToParquet >>>>>>> config: >>>>>>> path: /beam/filesytem/dest >>>>>>> >>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N" >>>>>>> >>>>>>> Of course, in practice file writing is often much more complicated >>>>>>> than this (especially when it comes to Streaming). For reference, I've >>>>>>> included links to our existing offerings in the various SDKs below. I'd >>>>>>> like to start a discussion about what else should go in the "config" >>>>>>> parameter and how it should be expressed in YAML. >>>>>>> >>>>>>> The primary concern is around naming. This can generally be split >>>>>>> into (1) the prefix, which must be provided by the users (2) the sharing >>>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but >>>>>>> also >>>>>>> windowing information (for streaming pipelines) which we may want to >>>>>>> allow >>>>>>> the user to customize the formatting of, and (3) a suffix like .json or >>>>>>> .avro that is useful for both humans and tooling and can often be >>>>>>> inferred >>>>>>> but should allow customization as well. >>>>>>> >>>>>>> An interesting case is that of dynamic destinations, where the >>>>>>> prefix (or other parameters) may themselves be functions of the records >>>>>>> themselves. (I am excluding the case where the format itself is >>>>>>> variable--such cases are probably better handled by explicitly >>>>>>> partitioning >>>>>>> the data and doing multiple writes, as this introduces significant >>>>>>> complexities and the set of possible formats is generally finite and >>>>>>> known >>>>>>> ahead of time.) I propose that we leverage the fact that we have >>>>>>> structured >>>>>>> data to be able to pull out these dynamic parameters. For example, if we >>>>>>> have an input data set with a string column my_col we could allow >>>>>>> something >>>>>>> like >>>>>>> >>>>>>> config: >>>>>>> path: {dynamic: my_col} >>>>>>> >>>>>>> which would pull this information out at runtime. (With the >>>>>>> MapToFields transform, it is very easy to compute/append additional >>>>>>> fields >>>>>>> to existing records.) Generally this field would then be stripped from >>>>>>> the >>>>>>> written data, which would only see the subset of non-dynamically >>>>>>> referenced >>>>>>> columns (though this could be configurable: we could add an attribute >>>>>>> like >>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be >>>>>>> actually >>>>>>> written (or elided) to be enumerated in the config or allow/require the >>>>>>> actual data to be written to be in a designated field of the "full" >>>>>>> input >>>>>>> records as arranged by a preceding transform). It'd be great to get >>>>>>> input/impressions from a wide range of people here on what would be the >>>>>>> most natural. Often just writing out snippets of various alternatives >>>>>>> can >>>>>>> be quite informative (though I'm avoiding putting them here for the >>>>>>> moment >>>>>>> to avoid biasing ideas right off the bat). >>>>>>> >>>>>>> For streaming pipelines it is often essential to write data out in a >>>>>>> time-partitioned manner. The typical way to do this is to add the >>>>>>> windowing >>>>>>> information into the shard specification itself, and a (set of) file(s) >>>>>>> is >>>>>>> written on each window closing. Beam YAML already supports any transform >>>>>>> being given a "windowing" configuration which will cause a WindowInto >>>>>>> transform to be applied to its input(s) before application which can sit >>>>>>> naturally on a sink. We may want to consider if non-windowed writes make >>>>>>> sense as well (though how this interacts with the watermark and >>>>>>> underlying >>>>>>> implementations are a large open question, so this is a larger change >>>>>>> that >>>>>>> might make sense to defer). >>>>>>> >>>>>>> Note that I am explicitly excluding "coders" here. All data in YAML >>>>>>> should be schema'd, and writers should know how to write this structured >>>>>>> data. We may want to allow a "schema" field to allow a user to specify >>>>>>> the >>>>>>> desired schema in a manner compatible with the sink format itself (e.g. >>>>>>> avro, json, whatever) that could be used both for validation and >>>>>>> possibly >>>>>>> resolving ambiguities (e.g. if the sink has an enum format that is not >>>>>>> expressed in the schema of the input PCollection). >>>>>>> >>>>>>> Some other configuration options are that some formats (especially >>>>>>> text-based ones) allow for specification of an external compression type >>>>>>> (which may be inferable from the suffix), whether to write a single >>>>>>> shard >>>>>>> if the input collection is empty or no shards at all (an occasional user >>>>>>> request that's supported for some Beam sinks now), whether to allow >>>>>>> fixed >>>>>>> sharing (generally discouraged, as it disables things like automatic >>>>>>> shading based on input size, let alone dynamic work rebalancing, though >>>>>>> sometimes this is useful if the input is known to be small and a single >>>>>>> output is desired regardless of the restriction in parallelism), or >>>>>>> other >>>>>>> sharding parameters (e.g. limiting the number of total elements or >>>>>>> (approximately) total number of bytes per output shard). Some of these >>>>>>> options may not be available/implemented for all formats--consideration >>>>>>> should be given as to how to handle this inconsistency (runtime errors >>>>>>> for >>>>>>> unsupported combinations or simply not allowing them on any until all >>>>>>> are >>>>>>> supported). >>>>>>> >>>>>>> A final consideration: we do not anticipate exposing the full >>>>>>> complexity of Beam in the YAML offering. For advanced users using a >>>>>>> "real" >>>>>>> SDK will often be preferable, and we intend to provide a migration path >>>>>>> from YAML to a language of your choice (codegen) as a migration path. >>>>>>> So we >>>>>>> should balance simplicity with completeness and utility here. >>>>>>> >>>>>>> Sure, we could just pick something, but given that the main point of >>>>>>> YAML is not capability, but expressibility and ease-of-use, I think it's >>>>>>> worth trying to get the expression of these concepts right. I'm sure >>>>>>> many >>>>>>> of you have written a pipeline to files at some point in time; I'd >>>>>>> welcome >>>>>>> any thoughts anyone has on the matter. >>>>>>> >>>>>>> - Robert >>>>>>> >>>>>>> >>>>>>> P.S. A related consideration: how should we consider the plain Read >>>>>>> (where that file pattern is given at pipeline construction) from the >>>>>>> ReadAll variants? Should they be separate transforms, or should we >>>>>>> instead >>>>>>> allow the same named transform (e.g. ReadFromParquet) support both >>>>>>> modes, >>>>>>> depending on whether an input PCollection or explicit file path is given >>>>>>> (the two being mutually exclusive, with exactly one required, and good >>>>>>> error messaging of course)? >>>>>>> >>>>>>> >>>>>>> Java: >>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >>>>>>> Python: >>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >>>>>>> Go: >>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >>>>>>> Typescript: >>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >>>>>>> Scio: >>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >>>>>>> >>>>>>>