On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <chamik...@google.com>
wrote:

>
> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote:
>
>> I suspect some simple pattern templating would solve most use cases. We
>> probably would want to support timestamp formatting (e.g. $YYYY $M $D) as
>> well.
>>
>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>> I would say:
>>>>
>>>>     sink:
>>>>       type: WriteToParquet
>>>>       config:
>>>>         path: /beam/filesytem/dest
>>>>         prefix: <my prefix>
>>>>         suffix: <my suffix>
>>>>
>>>> Underlying SDK will add the middle part of the file names to make sure
>>>> that files generated by various bundles/windows/shards do not conflict.
>>>>
>>>
>>> What's the relationship between path and prefix? Is path the
>>> directory part of the full path, or does prefix precede it?
>>>
>>
> prefix would be the first part of the file name so each shard will be
> named.
> <path>/<prefix>-<shard components added by the runner>-<suffix>
>
> This is similar to what we do in existing SDKS. For example, Java FileIO,
>
>
> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>

Yeah, although there's no distinction between path and prefix.


>>>
>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>> customizing the file pattern sounds like a more advanced use case that can
>>>> be left for "real" SDKs.
>>>>
>>>
>>> Yea, we don't have to do everything.
>>>
>>>
>>>> For dynamic destinations, I think just making the "path" component
>>>> support  a lambda that is parameterized by the input should be adequate
>>>> since this allows customers to direct files written to different
>>>> destination directories.
>>>>
>>>>     sink:
>>>>       type: WriteToParquet
>>>>       config:
>>>>         path: <destination lambda>
>>>>         prefix: <my prefix>
>>>>         suffix: <my suffix>
>>>>
>>>> I'm not sure what would be the best way to specify a lambda here
>>>> though. Maybe a regex or the name of a Python callable ?
>>>>
>>>
>>> I'd rather not require Python for a pure Java pipeline, but some kind of
>>> a pattern template may be sufficient here.
>>>
>>>
>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>> having
>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>> create a KV where the key contained the dynamic information). The problem
>>>>>> was that often the size of the generated filepath was often much larger
>>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>> integer userid, and the filepath prefix would then be
>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases where 
>>>>>> the
>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>
>>>>>
>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>> bringing this up.
>>>>>
>>>>>
>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>>> from the record!), but we should keep in mind that this might mean that
>>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>>
>>>>>
>>>>> Yep, it's not as straightforward to do in a declarative way. I would
>>>>> like to avoid mixing UDFs (with their associated languages and execution
>>>>> environments) if possible. Though I'd like the performance of a
>>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>>> straight-line Java (and possibly better, if we can leverage the structure
>>>>> of schemas everywhere) this is not an absolute requirement for all
>>>>> features.
>>>>>
>>>>> I wonder if separating out a constant prefix vs. the dynamic stuff
>>>>> could be sufficient to mitigate the blow-up of pre-computing this in most
>>>>> cases (especially in the context of a larger pipeline). Alternatively,
>>>>> rather than just a sharding pattern, one could have a full filepattern 
>>>>> that
>>>>> includes format parameters for dynamically computed bits as well as the
>>>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>>>
>>>>>
>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Currently the various file writing configurations take a single
>>>>>>> parameter, path, which indicates where the (sharded) output should be
>>>>>>> placed. In other words, one can write something like
>>>>>>>
>>>>>>>   pipeline:
>>>>>>>     ...
>>>>>>>     sink:
>>>>>>>       type: WriteToParquet
>>>>>>>       config:
>>>>>>>         path: /beam/filesytem/dest
>>>>>>>
>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>>>
>>>>>>> Of course, in practice file writing is often much more complicated
>>>>>>> than this (especially when it comes to Streaming). For reference, I've
>>>>>>> included links to our existing offerings in the various SDKs below. I'd
>>>>>>> like to start a discussion about what else should go in the "config"
>>>>>>> parameter and how it should be expressed in YAML.
>>>>>>>
>>>>>>> The primary concern is around naming. This can generally be split
>>>>>>> into (1) the prefix, which must be provided by the users (2) the sharing
>>>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but 
>>>>>>> also
>>>>>>> windowing information (for streaming pipelines) which we may want to 
>>>>>>> allow
>>>>>>> the user to customize the formatting of, and (3) a suffix like .json or
>>>>>>> .avro that is useful for both humans and tooling and can often be 
>>>>>>> inferred
>>>>>>> but should allow customization as well.
>>>>>>>
>>>>>>> An interesting case is that of dynamic destinations, where the
>>>>>>> prefix (or other parameters) may themselves be functions of the records
>>>>>>> themselves. (I am excluding the case where the format itself is
>>>>>>> variable--such cases are probably better handled by explicitly 
>>>>>>> partitioning
>>>>>>> the data and doing multiple writes, as this introduces significant
>>>>>>> complexities and the set of possible formats is generally finite and 
>>>>>>> known
>>>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>>>> structured
>>>>>>> data to be able to pull out these dynamic parameters. For example, if we
>>>>>>> have an input data set with a string column my_col we could allow 
>>>>>>> something
>>>>>>> like
>>>>>>>
>>>>>>>   config:
>>>>>>>     path: {dynamic: my_col}
>>>>>>>
>>>>>>> which would pull this information out at runtime. (With the
>>>>>>> MapToFields transform, it is very easy to compute/append additional 
>>>>>>> fields
>>>>>>> to existing records.) Generally this field would then be stripped from 
>>>>>>> the
>>>>>>> written data, which would only see the subset of non-dynamically 
>>>>>>> referenced
>>>>>>> columns (though this could be configurable: we could add an attribute 
>>>>>>> like
>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be 
>>>>>>> actually
>>>>>>> written (or elided) to be enumerated in the config or allow/require the
>>>>>>> actual data to be written to be in a designated field of the "full" 
>>>>>>> input
>>>>>>> records as arranged by a preceding transform). It'd be great to get
>>>>>>> input/impressions from a wide range of people here on what would be the
>>>>>>> most natural. Often just writing out snippets of various alternatives 
>>>>>>> can
>>>>>>> be quite informative (though I'm avoiding putting them here for the 
>>>>>>> moment
>>>>>>> to avoid biasing ideas right off the bat).
>>>>>>>
>>>>>>> For streaming pipelines it is often essential to write data out in a
>>>>>>> time-partitioned manner. The typical way to do this is to add the 
>>>>>>> windowing
>>>>>>> information into the shard specification itself, and a (set of) file(s) 
>>>>>>> is
>>>>>>> written on each window closing. Beam YAML already supports any transform
>>>>>>> being given a "windowing" configuration which will cause a WindowInto
>>>>>>> transform to be applied to its input(s) before application which can sit
>>>>>>> naturally on a sink. We may want to consider if non-windowed writes make
>>>>>>> sense as well (though how this interacts with the watermark and 
>>>>>>> underlying
>>>>>>> implementations are a large open question, so this is a larger change 
>>>>>>> that
>>>>>>> might make sense to defer).
>>>>>>>
>>>>>>> Note that I am explicitly excluding "coders" here. All data in YAML
>>>>>>> should be schema'd, and writers should know how to write this structured
>>>>>>> data. We may want to allow a "schema" field to allow a user to specify 
>>>>>>> the
>>>>>>> desired schema in a manner compatible with the sink format itself (e.g.
>>>>>>> avro, json, whatever) that could be used both for validation and 
>>>>>>> possibly
>>>>>>> resolving ambiguities (e.g. if the sink has an enum format that is not
>>>>>>> expressed in the schema of the input PCollection).
>>>>>>>
>>>>>>> Some other configuration options are that some formats (especially
>>>>>>> text-based ones) allow for specification of an external compression type
>>>>>>> (which may be inferable from the suffix), whether to write a single 
>>>>>>> shard
>>>>>>> if the input collection is empty or no shards at all (an occasional user
>>>>>>> request that's supported for some Beam sinks now), whether to allow 
>>>>>>> fixed
>>>>>>> sharing (generally discouraged, as it disables things like automatic
>>>>>>> shading based on input size, let alone dynamic work rebalancing, though
>>>>>>> sometimes this is useful if the input is known to be small and a single
>>>>>>> output is desired regardless of the restriction in parallelism), or 
>>>>>>> other
>>>>>>> sharding parameters (e.g. limiting the number of total elements or
>>>>>>> (approximately) total number of bytes per output shard). Some of these
>>>>>>> options may not be available/implemented for all formats--consideration
>>>>>>> should be given as to how to handle this inconsistency (runtime errors 
>>>>>>> for
>>>>>>> unsupported combinations or simply not allowing them on any until all 
>>>>>>> are
>>>>>>> supported).
>>>>>>>
>>>>>>> A final consideration: we do not anticipate exposing the full
>>>>>>> complexity of Beam in the YAML offering. For advanced users using a 
>>>>>>> "real"
>>>>>>> SDK will often be preferable, and we intend to provide a migration path
>>>>>>> from YAML to a language of your choice (codegen) as a migration path. 
>>>>>>> So we
>>>>>>> should balance simplicity with completeness and utility here.
>>>>>>>
>>>>>>> Sure, we could just pick something, but given that the main point of
>>>>>>> YAML is not capability, but expressibility and ease-of-use, I think it's
>>>>>>> worth trying to get the expression of these concepts right. I'm sure 
>>>>>>> many
>>>>>>> of you have written a pipeline to files at some point in time; I'd 
>>>>>>> welcome
>>>>>>> any thoughts anyone has on the matter.
>>>>>>>
>>>>>>> - Robert
>>>>>>>
>>>>>>>
>>>>>>> P.S. A related consideration: how should we consider the plain Read
>>>>>>> (where that file pattern is given at pipeline construction) from the
>>>>>>> ReadAll variants? Should they be separate transforms, or should we 
>>>>>>> instead
>>>>>>> allow the same named transform (e.g. ReadFromParquet) support both 
>>>>>>> modes,
>>>>>>> depending on whether an input PCollection or explicit file path is given
>>>>>>> (the two being mutually exclusive, with exactly one required, and good
>>>>>>> error messaging of course)?
>>>>>>>
>>>>>>>
>>>>>>> Java:
>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>>>> Python:
>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>>>> Go:
>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>>>> Typescript:
>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>>>> Scio:
>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>>>
>>>>>>>

Reply via email to