The verification is on the ParDo (or specifically the ParDo's place in the
graph), not on the PCollection.
If you remember from the original schema discussion on this list, part of
the design was to allow Rows and user types to be used interchangeably when
schemas are around. For example, if you have a PCollection<UserType> pc,
it's legal to write;
pc.apply(ParDo.of(new DoFn....
@ProcessElement
public void process(@Element Row row) {
}
}
Practically our goal was to make schemas an integral part of the type
system, and not force users to write ParDos to convert to PCollection<Row>
whenever they want to use schema-based transforms; this seamless usability
is one thing that distinguishes our work on schemas from that of some other
systems. It also enables us to do things efficiently, as otherwise the cost
of constant conversions back and forth to Row would start to dominate.
Of course this is only legal if the input PCollection has a schema, so we
verify that (today in expand()). The problem I'm dealing with is that we
also allow the same for OutputReceiver (it would somewhat defeat the
purpose to do this only on input and not on output). So you can have
something like:
@ProcessElement
public void process(...., OutputReceiver<Row> output) {
}
The problem I face today is that this is only truly legal if the
PCollection that the ParDo is outputting to has a schema, but we don't know
that until coder inference is done. Right now this means that if a mistake
is made, it's not caught until the ParDo is actually running - an
unfortunate situation. I would like to be able to detect these
OutputReceiver parameters and verify that they are correct before the
pipeline begins to run.
Reuven
On Fri, Jan 11, 2019 at 1:33 PM Kenneth Knowles <[email protected]> wrote:
> Can you elaborate a bit more? Maybe a specific code example? I'm a little
> bit concerned about this sort of global verification. If the PCollection
> gets passed around afterwards, new restrictions on what can be done with it
> are a pretty big deal.
>
> Kenn
>
> On Fri, Jan 11, 2019 at 12:58 PM Reuven Lax <[email protected]> wrote:
>
>> My problem is exactly outputs. I want to verify schemas for any
>> OutputReceiver parameters, and I don't think I can do this in expand.
>>
>> The best idea I have so far is to create a new PipelineVisitor to do
>> this, and run that after the normal apply is done.
>>
>> Reuven
>>
>> On Fri, Jan 11, 2019 at 12:39 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> I believe that today all coders must be fully defined for all arguments
>>> to expand(). For the outputs, the ParDo outputting should be agnostic, no?
>>> The constraints on setCoder(...) are hoped to be enough to make sure
>>> nothing breaks.
>>>
>>> Kenn
>>>
>>> On Fri, Jan 11, 2019 at 10:41 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to be able to write a verification phase that asserts that input
>>>> and output schemas for all ParDos match up properly. The only place I can
>>>> see to do that today is in expand(), however this does not work as Coders
>>>> may not be fully known when expand is called (remember Schemas are
>>>> implemented as a special type of Coder today). For example:
>>>>
>>>> p.apply(ParDo.of(MyDoFn))
>>>> .SetCoder(FooCoder());
>>>>
>>>> FooCoder is not known yet when expand is called for the ParDo.
>>>>
>>>> Is there any place in Beam today where I could set up such a
>>>> verification pass?
>>>>
>>>> Reuven
>>>>
>>>