That clarifies a lot.

One take is that a ParDo doesn't output to an existing PCollection. A ParDo
returns a PCollection, which it creates and can put special conditions on.
Our Java code might make this a pain but that's conceptually what is going
on. So perhaps the restriction is that if a ParDo has an
OutputReceiver<Row> it should output a PCollection where setCoder(...) will
reject types that don't have a (compatible) schema.

Kenn

On Fri, Jan 11, 2019 at 2:00 PM Reuven Lax <[email protected]> wrote:

> The verification is on the ParDo (or specifically the ParDo's place in the
> graph), not on the PCollection.
>
> If you remember from the original schema discussion on this list, part of
> the design was to allow Rows and user types to be used interchangeably when
> schemas are around. For example, if you have a PCollection<UserType> pc,
> it's legal to write;
>
> pc.apply(ParDo.of(new DoFn....
>    @ProcessElement
>     public void process(@Element Row row) {
>     }
> }
>
> Practically our goal was to make schemas an integral part of the type
> system, and not force users to write ParDos to convert to PCollection<Row>
> whenever they want to use schema-based transforms; this seamless usability
> is one thing that distinguishes our work on schemas from that of some other
> systems. It also enables us to do things efficiently, as otherwise the cost
> of constant conversions back and forth to Row would start to dominate.
>
> Of course this is only legal if the input PCollection has a schema, so we
> verify that (today in expand()). The problem I'm dealing with is that we
> also allow the same for OutputReceiver (it would somewhat defeat the
> purpose to do this only on input and not on output). So you can have
> something like:
>
> @ProcessElement
> public void process(...., OutputReceiver<Row> output) {
> }
>
> The problem I face today is that this is only truly legal if the
> PCollection that the ParDo is outputting to has a schema, but we don't know
> that until coder inference is done. Right now this means that if a mistake
> is made, it's not caught until the ParDo is actually running - an
> unfortunate situation. I would like to be able to detect these
> OutputReceiver parameters and verify that they are correct before the
> pipeline begins to run.
>
> Reuven
>
> On Fri, Jan 11, 2019 at 1:33 PM Kenneth Knowles <[email protected]> wrote:
>
>> Can you elaborate a bit more? Maybe a specific code example? I'm a little
>> bit concerned about this sort of global verification. If the PCollection
>> gets passed around afterwards, new restrictions on what can be done with it
>> are a pretty big deal.
>>
>> Kenn
>>
>> On Fri, Jan 11, 2019 at 12:58 PM Reuven Lax <[email protected]> wrote:
>>
>>> My problem is exactly outputs. I want to verify schemas for any
>>> OutputReceiver parameters, and I don't think I can do this in expand.
>>>
>>> The best idea I have so far is to create a new PipelineVisitor to do
>>> this, and run that after the normal apply is done.
>>>
>>> Reuven
>>>
>>> On Fri, Jan 11, 2019 at 12:39 PM Kenneth Knowles <[email protected]>
>>> wrote:
>>>
>>>> I believe that today all coders must be fully defined for all arguments
>>>> to expand(). For the outputs, the ParDo outputting should be agnostic, no?
>>>> The constraints on setCoder(...) are hoped to be enough to make sure
>>>> nothing breaks.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Jan 11, 2019 at 10:41 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to be able to write a verification phase that asserts that
>>>>> input and output schemas for all ParDos match up properly. The only place 
>>>>> I
>>>>> can see to do that today is in expand(), however this does not work as
>>>>> Coders may not be fully known when expand is called (remember Schemas are
>>>>> implemented as a special type of Coder today). For example:
>>>>>
>>>>> p.apply(ParDo.of(MyDoFn))
>>>>>   .SetCoder(FooCoder());
>>>>>
>>>>> FooCoder is not known yet when expand is called for the ParDo.
>>>>>
>>>>> Is there any place in Beam today where I could set up such a
>>>>> verification pass?
>>>>>
>>>>> Reuven
>>>>>
>>>>

Reply via email to