On Tue, Oct 10, 2023 at 7:21 AM Kenneth Knowles <k...@apache.org> wrote:
> Another perspective: > > We should focus on the fact that FileIO writes what I would call a "big > file-based dataset" to a filesystem. The primary characteristic of a "big > file-based dataset" is that it is sharded and that the shards should not > have any individual distinctiveness. The dataset should be read and written > as a whole, and the shards are an implementation detail for performance. > > This impact dynamic destinations in two ways that I can think of right > away: > > - It is critical by definition to be able to refer to a whole "big > file-based dataset" as a whole thing. The most obvious way would be for it > to exist as a folder or folder-like grouping of files, and you glob > everything underneath that. But the hard requirement is that there is > *some* way to refer to the dataset as a single entity (via glob, > basically). > > - When using "dynamic destinations" style functionality, each of the > dynamic destinations is a "big file-based dataset". The purpose is to route > a datum to the correct one of these, NOT to route the datum to a particular > file (which would be just some anonymous shard in the dataset). > > So having really fine-grained control over filenames is likely to result > in anti-patterns of malformed datasets that cannot be easily globbed or, in > the converse case, well-formed datasets that have suboptimal sharding > because it was manually managed. > > I know that "reality" is not this simple, because people have accumulated > files they have to work with as-is, where they probably didn't plan for > this way of thinking when they were gathering the files. We need to give > good options for everyone, but the golden path should be the simple and > good case. > Yeah. I think both are valid perspectives (especially when trying to fit in with the often messy setups that exist today). > On Tue, Oct 10, 2023 at 10:09 AM Kenneth Knowles <k...@apache.org> wrote: > >> Since I've been in GHA files lately... >> >> I think they have a very useful pattern which we could borrow from or >> learn from, where setting up the variables happens separately, like >> https://github.com/apache/beam/blob/57821c191d322f9f21c01a34c55e0c40eda44f1e/.github/workflows/build_release_candidate.yml#L270 >> >> If we called the section "vars" and then the config could use the vars in >> the destination. I'm making this example deliberately a little gross: >> >> - vars: >> - USER_REGION: $.user.metadata.region >> - USER_GROUP: $.user.groups[0].name >> - config: >> - path: >> gs://output-bucket-${vars.USER_REGION}/files/${vars.USER_GROUP}-${fileio.SHARD_NUM}-${fileio.WINDOW} >> >> I think it strikes a good balance between arbitrary lambdas and just a >> prefix/suffix control, giving a really easy place where we can say "the >> whole value of this YAML field is a path expression into the structured >> data" >> > Would we want to impose any constraints here? E.g. what about "gs://output-bucket-${fileio.SHARD_NUM}/foo/file-${vars.USER_GROUP}.txt"? What if the shard number (or windowing information) is omitted? Would we automatically (optionally)? exclude vars from the written data? I like the idea of providing string templates as the "function" but things may be less error-prone if we can automatically insert things. > Kenn >> >> On Mon, Oct 9, 2023 at 6:09 PM Chamikara Jayalath via dev < >> dev@beam.apache.org> wrote: >> >>> I would say: >>> >>> sink: >>> type: WriteToParquet >>> config: >>> path: /beam/filesytem/dest >>> prefix: <my prefix> >>> suffix: <my suffix> >>> >>> Underlying SDK will add the middle part of the file names to make sure >>> that files generated by various bundles/windows/shards do not conflict. >>> >>> This will satisfy the vast majority of use-cases I believe. Fully >>> customizing the file pattern sounds like a more advanced use case that can >>> be left for "real" SDKs. >>> >>> For dynamic destinations, I think just making the "path" component >>> support a lambda that is parameterized by the input should be adequate >>> since this allows customers to direct files written to different >>> destination directories. >>> >>> sink: >>> type: WriteToParquet >>> config: >>> path: <destination lambda> >>> prefix: <my prefix> >>> suffix: <my suffix> >>> >>> I'm not sure what would be the best way to specify a lambda here though. >>> Maybe a regex or the name of a Python callable ? >>> >>> Thanks, >>> Cham >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> >>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Just FYI - the reason why names (including prefixes) in >>>>> DynamicDestinations were parameterized via a lambda instead of just having >>>>> the user add it via MapElements is performance. We discussed something >>>>> along the lines of what you are suggesting (essentially having the user >>>>> create a KV where the key contained the dynamic information). The problem >>>>> was that often the size of the generated filepath was often much larger >>>>> (sometimes by 2 OOM) than the information in the record, and there was a >>>>> desire to avoid record blowup. e.g. the record might contain a single >>>>> integer userid, and the filepath prefix would then be >>>>> /long/path/to/output/users/<id>. This was especially bad in cases where >>>>> the >>>>> data had to be shuffled, and the existing dynamic destinations method >>>>> allowed extracting the filepath only _after_ the shuffle. >>>>> >>>> >>>> That is a consideration I hadn't thought much of, thanks for >>>> bringing this up. >>>> >>>> >>>>> Now there may not be any good way to keep this benefit in a >>>>> declarative approach such as YAML (or at least a good easy way - we could >>>>> always allow the user to pass in a SQL expression to extract the filename >>>>> from the record!), but we should keep in mind that this might mean that >>>>> YAML-generated pipelines will be less efficient for certain use cases. >>>>> >>>> >>>> Yep, it's not as straightforward to do in a declarative way. I would >>>> like to avoid mixing UDFs (with their associated languages and execution >>>> environments) if possible. Though I'd like the performance of a >>>> "straightforward" YAML pipeline to be that which one can get writing >>>> straight-line Java (and possibly better, if we can leverage the structure >>>> of schemas everywhere) this is not an absolute requirement for all >>>> features. >>>> >>>> I wonder if separating out a constant prefix vs. the dynamic stuff >>>> could be sufficient to mitigate the blow-up of pre-computing this in most >>>> cases (especially in the context of a larger pipeline). Alternatively, >>>> rather than just a sharding pattern, one could have a full filepattern that >>>> includes format parameters for dynamically computed bits as well as the >>>> shard number, windowing info, etc. (There are pros and cons to this.) >>>> >>>> >>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < >>>>> dev@beam.apache.org> wrote: >>>>> >>>>>> Currently the various file writing configurations take a single >>>>>> parameter, path, which indicates where the (sharded) output should be >>>>>> placed. In other words, one can write something like >>>>>> >>>>>> pipeline: >>>>>> ... >>>>>> sink: >>>>>> type: WriteToParquet >>>>>> config: >>>>>> path: /beam/filesytem/dest >>>>>> >>>>>> and one gets files like "/beam/filesystem/dest-X-of-N" >>>>>> >>>>>> Of course, in practice file writing is often much more complicated >>>>>> than this (especially when it comes to Streaming). For reference, I've >>>>>> included links to our existing offerings in the various SDKs below. I'd >>>>>> like to start a discussion about what else should go in the "config" >>>>>> parameter and how it should be expressed in YAML. >>>>>> >>>>>> The primary concern is around naming. This can generally be split >>>>>> into (1) the prefix, which must be provided by the users (2) the sharing >>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but >>>>>> also >>>>>> windowing information (for streaming pipelines) which we may want to >>>>>> allow >>>>>> the user to customize the formatting of, and (3) a suffix like .json or >>>>>> .avro that is useful for both humans and tooling and can often be >>>>>> inferred >>>>>> but should allow customization as well. >>>>>> >>>>>> An interesting case is that of dynamic destinations, where the prefix >>>>>> (or other parameters) may themselves be functions of the records >>>>>> themselves. (I am excluding the case where the format itself is >>>>>> variable--such cases are probably better handled by explicitly >>>>>> partitioning >>>>>> the data and doing multiple writes, as this introduces significant >>>>>> complexities and the set of possible formats is generally finite and >>>>>> known >>>>>> ahead of time.) I propose that we leverage the fact that we have >>>>>> structured >>>>>> data to be able to pull out these dynamic parameters. For example, if we >>>>>> have an input data set with a string column my_col we could allow >>>>>> something >>>>>> like >>>>>> >>>>>> config: >>>>>> path: {dynamic: my_col} >>>>>> >>>>>> which would pull this information out at runtime. (With the >>>>>> MapToFields transform, it is very easy to compute/append additional >>>>>> fields >>>>>> to existing records.) Generally this field would then be stripped from >>>>>> the >>>>>> written data, which would only see the subset of non-dynamically >>>>>> referenced >>>>>> columns (though this could be configurable: we could add an attribute >>>>>> like >>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be >>>>>> actually >>>>>> written (or elided) to be enumerated in the config or allow/require the >>>>>> actual data to be written to be in a designated field of the "full" input >>>>>> records as arranged by a preceding transform). It'd be great to get >>>>>> input/impressions from a wide range of people here on what would be the >>>>>> most natural. Often just writing out snippets of various alternatives can >>>>>> be quite informative (though I'm avoiding putting them here for the >>>>>> moment >>>>>> to avoid biasing ideas right off the bat). >>>>>> >>>>>> For streaming pipelines it is often essential to write data out in a >>>>>> time-partitioned manner. The typical way to do this is to add the >>>>>> windowing >>>>>> information into the shard specification itself, and a (set of) file(s) >>>>>> is >>>>>> written on each window closing. Beam YAML already supports any transform >>>>>> being given a "windowing" configuration which will cause a WindowInto >>>>>> transform to be applied to its input(s) before application which can sit >>>>>> naturally on a sink. We may want to consider if non-windowed writes make >>>>>> sense as well (though how this interacts with the watermark and >>>>>> underlying >>>>>> implementations are a large open question, so this is a larger change >>>>>> that >>>>>> might make sense to defer). >>>>>> >>>>>> Note that I am explicitly excluding "coders" here. All data in YAML >>>>>> should be schema'd, and writers should know how to write this structured >>>>>> data. We may want to allow a "schema" field to allow a user to specify >>>>>> the >>>>>> desired schema in a manner compatible with the sink format itself (e.g. >>>>>> avro, json, whatever) that could be used both for validation and possibly >>>>>> resolving ambiguities (e.g. if the sink has an enum format that is not >>>>>> expressed in the schema of the input PCollection). >>>>>> >>>>>> Some other configuration options are that some formats (especially >>>>>> text-based ones) allow for specification of an external compression type >>>>>> (which may be inferable from the suffix), whether to write a single shard >>>>>> if the input collection is empty or no shards at all (an occasional user >>>>>> request that's supported for some Beam sinks now), whether to allow fixed >>>>>> sharing (generally discouraged, as it disables things like automatic >>>>>> shading based on input size, let alone dynamic work rebalancing, though >>>>>> sometimes this is useful if the input is known to be small and a single >>>>>> output is desired regardless of the restriction in parallelism), or other >>>>>> sharding parameters (e.g. limiting the number of total elements or >>>>>> (approximately) total number of bytes per output shard). Some of these >>>>>> options may not be available/implemented for all formats--consideration >>>>>> should be given as to how to handle this inconsistency (runtime errors >>>>>> for >>>>>> unsupported combinations or simply not allowing them on any until all are >>>>>> supported). >>>>>> >>>>>> A final consideration: we do not anticipate exposing the full >>>>>> complexity of Beam in the YAML offering. For advanced users using a >>>>>> "real" >>>>>> SDK will often be preferable, and we intend to provide a migration path >>>>>> from YAML to a language of your choice (codegen) as a migration path. So >>>>>> we >>>>>> should balance simplicity with completeness and utility here. >>>>>> >>>>>> Sure, we could just pick something, but given that the main point of >>>>>> YAML is not capability, but expressibility and ease-of-use, I think it's >>>>>> worth trying to get the expression of these concepts right. I'm sure many >>>>>> of you have written a pipeline to files at some point in time; I'd >>>>>> welcome >>>>>> any thoughts anyone has on the matter. >>>>>> >>>>>> - Robert >>>>>> >>>>>> >>>>>> P.S. A related consideration: how should we consider the plain Read >>>>>> (where that file pattern is given at pipeline construction) from the >>>>>> ReadAll variants? Should they be separate transforms, or should we >>>>>> instead >>>>>> allow the same named transform (e.g. ReadFromParquet) support both modes, >>>>>> depending on whether an input PCollection or explicit file path is given >>>>>> (the two being mutually exclusive, with exactly one required, and good >>>>>> error messaging of course)? >>>>>> >>>>>> >>>>>> Java: >>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >>>>>> Python: >>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >>>>>> Go: >>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >>>>>> Typescript: >>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >>>>>> Scio: >>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >>>>>> >>>>>>