A preferable solution would be to augment the Beam sinks to support
these parameters. At the very least, we should probably make running
these with fixed shards a loud error in the meantime.

On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <[email protected]> wrote:
> Hi,
> the issue is a bit more complicated and involves the Beam sink API and the
> Flink runner.
>
> I'll have to get a bit into how Beam sinks work. The base class for sinks is
> Sink (TextIO.write gets translated to Write.to(new TextSink())). Write.to
> normally gets translated to three ParDo operations that cooperate to do the
> writing:
>
>  - "Initialize": this does initial initialization of the Sink, this is run
> only once, per sink, non-parallel.
>
>  - "WriteBundles": this gets an initialized sink on a side-input and the
> values to write on the main input. This runs in parallel, so for Flink, if
> you set parallelism=6 you'll get 6 parallel instances of this operation at
> runtime. This operation forwards information about where it writes to
> downstream. This does not write to the final file location but an
> intermediate staging location.
>
>  - "Finalize": This gets the initialized sink on the main-input and and the
> information about written files from "WriteBundles" as a side-input. This
> also only runs once, non-parallel. Here we're writing the intermediate files
> to a final location based on the sharding template.
>
> The problem is that Write.to() and TextSink, as well as all other sinks, are
> not aware of the number of shards. If you set "withoutSharding()" this will
> set the shard template to "" (empty string) and the number of shards to 1.
> "WriteBundles", however is not aware of this and will write 6 intermediate
> files if you set parallelism=6. In "Finalize" we will copy an intermediate
> file to the same final location 6 times based on the sharding template. The
> end result is that you only get one of the six result shards.
>
> The reason why this does only occur in the Flink runner is that all other
> runners have special overrides for TextIO.Write and AvroIO.Write that kick
> in if sharding control is required. So, for the time being this is a Flink
> runner bug and we might have to introduce special overrides as well until
> this is solved in the general case.
>
> Cheers,
> Aljoscha
>
> On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <[email protected]> wrote:
>>
>> Yes, just tested, it happens only with the flink runner.
>>
>> Agree to create a Jira.
>>
>> Regards
>> JB
>>
>> On 06/01/2016 03:41 AM, Davor Bonaci wrote:
>> > This will be a runner-specific issue. It would be the best to file a
>> > JIRA issue for this.
>> >
>> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Hi Pawel,
>> >
>> >     does it happen only with the Flink runner ? I bet it happens with
>> >     any runner.
>> >
>> >     Let me take a look.
>> >
>> >     Regards
>> >     JB
>> >
>> >     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
>> >
>> >         Hi,
>> >
>> >         I'm running a pipeline with Flink backend, Beam bleeding edge,
>> >         Oracle
>> >         Java 1.8, maven 3.3.3, linux64.
>> >
>> >         The pipeline is run with --parallelism=6.
>> >
>> >         Adding .withoutSharding()causes a TextIO sink to write only one
>> >         of the
>> >         shards.
>> >
>> >         Example use:
>> >
>> > data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
>> >         vs.
>> >
>> > data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>> >
>> >         Result:
>> >         Only part of data is written to file. After comparing to sharded
>> >         output,
>> >         it seems to be just one of shard files.
>> >
>> >         Cheers,
>> >         Pawel
>> >
>> >
>> >     --
>> >     Jean-Baptiste Onofré
>> >     [email protected] <mailto:[email protected]>
>> >     http://blog.nanthrax.net
>> >     Talend - http://www.talend.com
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com

Reply via email to