Hi Kenn, Thanks for the response.
I haven't hit any specific issue yet. I think if the IO connector implementation does take parallelism into consideration, runners can parallelize primitive transforms in the connector (key-partitioned for GBK and stateful ParDo, and round robin for stateless ParDo). For example, TextIO first writes a temp file for every bundle, then uses a void key to prevent parallelism, and then finalizes the result. It should work properly in a distributed environment. But applications can provide any custom IO connectors, and the runner does not know whether a connector can be safely parallelized. Can I assume that it is the applications' responsibility to make sure their IO connector works correctly when running in parallel? Thanks, Shen On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <k...@google.com> wrote: > The runner should generally not need to be aware of any getNumShard() API > on a connector. The connector itself is probably a composite transform > (with a ParDo or two or three somewhere doing the actual writes) and should > be designed to expose available parallelism. Specifying the number of > shards actually usually limits the parallelism, versus letting the runner > use the maximum allowed parallelism. > > If the connector does a GBK to gather input elements into a single > iterable, then it is a single element and cannot be processed in parallel > (except through splittable DoFn, but in that case you may not need to do > the GBK in the first place). And converse to that, if the connector does > not do a GBK to gather input elements, then the runner is permitted to > bundle them any way it wants and process all of them as though in parallel > (except for stateful DoFn, in which case you probably don't need the GBK). > > Bundling is an important way that this works, too, since the @FinishBundle > method is really a "flush" method, with @ProcessElement perhaps buffering > up elements to be written to e.g. the same file shard. It is not this > simple in practice but that gives the idea of how even with unrestricted > elementwise parallelism you don't get one shard per element. > > These are all just ideas, and I'm not the connector expert. But I think > the TL;DR is that a runner shouldn't need to know this - have you hit > specific issues with a particular connector? That could make this a very > productive discussion. > > Kenn > > On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi, >> >> It seems that there is no Sink base class. Some IO connectors (e.g., >> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally >> available for all existing Beam IO connectors and potential custom ones. >> Although >> some IO connectors are implemented using ParDo/GBK, it is unclear whether >> the runner can directly parallelize those transforms (e.g., what if it only >> writes to a single file). Is there a general way for runners to take >> advantage of sink parallelism? >> >> Thanks, >> Shen >> >> >>