A preferable solution would be to augment the Beam sinks to support these parameters. At the very least, we should probably make running these with fixed shards a loud error in the meantime.
On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <[email protected]> wrote: > Hi, > the issue is a bit more complicated and involves the Beam sink API and the > Flink runner. > > I'll have to get a bit into how Beam sinks work. The base class for sinks is > Sink (TextIO.write gets translated to Write.to(new TextSink())). Write.to > normally gets translated to three ParDo operations that cooperate to do the > writing: > > - "Initialize": this does initial initialization of the Sink, this is run > only once, per sink, non-parallel. > > - "WriteBundles": this gets an initialized sink on a side-input and the > values to write on the main input. This runs in parallel, so for Flink, if > you set parallelism=6 you'll get 6 parallel instances of this operation at > runtime. This operation forwards information about where it writes to > downstream. This does not write to the final file location but an > intermediate staging location. > > - "Finalize": This gets the initialized sink on the main-input and and the > information about written files from "WriteBundles" as a side-input. This > also only runs once, non-parallel. Here we're writing the intermediate files > to a final location based on the sharding template. > > The problem is that Write.to() and TextSink, as well as all other sinks, are > not aware of the number of shards. If you set "withoutSharding()" this will > set the shard template to "" (empty string) and the number of shards to 1. > "WriteBundles", however is not aware of this and will write 6 intermediate > files if you set parallelism=6. In "Finalize" we will copy an intermediate > file to the same final location 6 times based on the sharding template. The > end result is that you only get one of the six result shards. > > The reason why this does only occur in the Flink runner is that all other > runners have special overrides for TextIO.Write and AvroIO.Write that kick > in if sharding control is required. So, for the time being this is a Flink > runner bug and we might have to introduce special overrides as well until > this is solved in the general case. > > Cheers, > Aljoscha > > On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <[email protected]> wrote: >> >> Yes, just tested, it happens only with the flink runner. >> >> Agree to create a Jira. >> >> Regards >> JB >> >> On 06/01/2016 03:41 AM, Davor Bonaci wrote: >> > This will be a runner-specific issue. It would be the best to file a >> > JIRA issue for this. >> > >> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi Pawel, >> > >> > does it happen only with the Flink runner ? I bet it happens with >> > any runner. >> > >> > Let me take a look. >> > >> > Regards >> > JB >> > >> > On 05/30/2016 01:38 AM, Pawel Szczur wrote: >> > >> > Hi, >> > >> > I'm running a pipeline with Flink backend, Beam bleeding edge, >> > Oracle >> > Java 1.8, maven 3.3.3, linux64. >> > >> > The pipeline is run with --parallelism=6. >> > >> > Adding .withoutSharding()causes a TextIO sink to write only one >> > of the >> > shards. >> > >> > Example use: >> > >> > data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")); >> > vs. >> > >> > data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*); >> > >> > Result: >> > Only part of data is written to file. After comparing to sharded >> > output, >> > it seems to be just one of shard files. >> > >> > Cheers, >> > Pawel >> > >> > >> > -- >> > Jean-Baptiste Onofré >> > [email protected] <mailto:[email protected]> >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >> > >> > >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com
