[ 
https://issues.apache.org/jira/browse/BEAM-365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin reassigned BEAM-365:
------------------------------------

    Assignee: Daniel Halperin

> TextIO withoutSharding causes Flink to throw IllegalStateException
> ------------------------------------------------------------------
>
>                 Key: BEAM-365
>                 URL: https://issues.apache.org/jira/browse/BEAM-365
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 0.2.0-incubating
>            Reporter: Pawel Szczur
>            Assignee: Daniel Halperin
>
> The exception: 
> {code}java.lang.IllegalStateException: Shard name template '' only generated 
> 1 distinct file names for 3 files{code}
> The initial discussion took place some time ago, the {{withoutSharding}} was 
> then silently ignored by the runner.
> Explanation from Aljoscha Krettek:
> {quote}
> Hi,
> the issue is a bit more complicated and involves the Beam sink API and the
> Flink runner.
> I'll have to get a bit into how Beam sinks work. The base class for sinks
> is Sink (TextIO.write gets translated to Write.to(new TextSink())).
> Write.to normally gets translated to three ParDo operations that cooperate
> to do the writing:
>  - "Initialize": this does initial initialization of the Sink, this is run
> only once, per sink, non-parallel.
>  - "WriteBundles": this gets an initialized sink on a side-input and the
> values to write on the main input. This runs in parallel, so for Flink, if
> you set parallelism=6 you'll get 6 parallel instances of this operation at
> runtime. This operation forwards information about where it writes to
> downstream. This does not write to the final file location but an
> intermediate staging location.
>  - "Finalize": This gets the initialized sink on the main-input and and the
> information about written files from "WriteBundles" as a side-input. This
> also only runs once, non-parallel. Here we're writing the intermediate
> files to a final location based on the sharding template.
> The problem is that Write.to() and TextSink, as well as all other sinks,
> are not aware of the number of shards. If you set "withoutSharding()" this
> will set the shard template to "" (empty string) and the number of shards
> to 1. "WriteBundles", however is not aware of this and will write 6
> intermediate files if you set parallelism=6. In "Finalize" we will copy an
> intermediate file to the same final location 6 times based on the sharding
> template. The end result is that you only get one of the six result shards.
> The reason why this does only occur in the Flink runner is that all other
> runners have special overrides for TextIO.Write and AvroIO.Write that kick
> in if sharding control is required. So, for the time being this is a Flink
> runner bug and we might have to introduce special overrides as well until
> this is solved in the general case.
> Cheers,
> Aljoscha
> {quote}
> Original discussion:
> http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to