In general, Beam only deals with keys and grouping by key. I think expanding this idea to some more abstract notion of a sharding function could make sense.
For FileIO specifically, I wonder if you can use writeDynamic() to get the behavior you are seeking. Kenn On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com> wrote: > Adding sharding to the model may require a wider discussion than FileIO > alone. I'm not entirely sure how wide, or if this has been proposed before, > but IMO it warrants a design doc or proposal. > > A couple high level questions I can think of are, > - What runners support sharding? > * There will be some work in Dataflow required to support this but > I'm not sure how much. > - What does sharding mean for streaming pipelines? > > A more nitty-detail question: > - How can this be achieved performantly? For example, if a shuffle is > required to achieve a particular sharding constraint, should we > allow transforms to declare they don't modify the sharding property (e.g. > key preserving) which may allow a runner to avoid an additional shuffle if > a preceding shuffle can guarantee the sharding requirements? > > Where X is the shuffle that could be avoided: input -> shuffle (key > sharding fn A) -> transform1 (key preserving) -> transform 2 (key > preserving) -> X -> fileio (key sharding fn A) > > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> I would like to extend FileIO with possibility to specify a custom >> sharding function: >> https://issues.apache.org/jira/browse/BEAM-12493 >> >> I have 2 use-cases for this: >> >> 1. I need to generate shards which are compatible with Hive bucketing >> and therefore need to decide shard assignment based on data fields of >> input >> element >> 2. When running e.g. on Spark and job encounters kind of failure >> which cause a loss of some data from previous stages, Spark does issue >> recompute of necessary task in necessary stages to recover data. Because >> the shard assignment function is random as default, some data will end up >> in different shards and cause duplicates in the final output. >> >> Please let me know your thoughts in case you see a reason to not to add >> such improvement. >> >> Thanks, >> Jozef >> >