Actually, looking at things I think this will be quite simple. Thanks for
the suggestion!

On Fri, Jan 11, 2019 at 2:20 PM Reuven Lax <[email protected]> wrote:

>
>
> On Fri, Jan 11, 2019 at 2:14 PM Kenneth Knowles <[email protected]> wrote:
>
>> That clarifies a lot.
>>
>> One take is that a ParDo doesn't output to an existing PCollection. A
>> ParDo returns a PCollection, which it creates and can put special
>> conditions on
>>
>
> Yes, that's why I said we are verifying the ParDo's place in the graph.
>
>
>> . Our Java code might make this a pain but that's conceptually what is
>> going on. So perhaps the restriction is that if a ParDo has an
>> OutputReceiver<Row> it should output a PCollection where setCoder(...) will
>> reject types that don't have a (compatible) schema.
>>
>
> This is an interesting idea, and I like it. Can you suggest how this would
> be done? Today ParDo creates it's output as follows. If I wrap that will I
> break anything, or should everything just work?
>
> PCollection.createPrimitiveOutputInternal(
>         pipeline, windowingStrategy, isBounded, 
> coders.get(outputTag)).setTypeDescriptor((TypeDescriptor) 
> outputTag.getTypeDescriptor())
>
>
>
>
>>
>> Kenn
>>
>> On Fri, Jan 11, 2019 at 2:00 PM Reuven Lax <[email protected]> wrote:
>>
>>> The verification is on the ParDo (or specifically the ParDo's place in
>>> the graph), not on the PCollection.
>>>
>>> If you remember from the original schema discussion on this list, part
>>> of the design was to allow Rows and user types to be used interchangeably
>>> when schemas are around. For example, if you have a PCollection<UserType>
>>> pc, it's legal to write;
>>>
>>> pc.apply(ParDo.of(new DoFn....
>>>    @ProcessElement
>>>     public void process(@Element Row row) {
>>>     }
>>> }
>>>
>>> Practically our goal was to make schemas an integral part of the type
>>> system, and not force users to write ParDos to convert to PCollection<Row>
>>> whenever they want to use schema-based transforms; this seamless usability
>>> is one thing that distinguishes our work on schemas from that of some other
>>> systems. It also enables us to do things efficiently, as otherwise the cost
>>> of constant conversions back and forth to Row would start to dominate.
>>>
>>> Of course this is only legal if the input PCollection has a schema, so
>>> we verify that (today in expand()). The problem I'm dealing with is that we
>>> also allow the same for OutputReceiver (it would somewhat defeat the
>>> purpose to do this only on input and not on output). So you can have
>>> something like:
>>>
>>> @ProcessElement
>>> public void process(...., OutputReceiver<Row> output) {
>>> }
>>>
>>> The problem I face today is that this is only truly legal if the
>>> PCollection that the ParDo is outputting to has a schema, but we don't know
>>> that until coder inference is done. Right now this means that if a mistake
>>> is made, it's not caught until the ParDo is actually running - an
>>> unfortunate situation. I would like to be able to detect these
>>> OutputReceiver parameters and verify that they are correct before the
>>> pipeline begins to run.
>>>
>>> Reuven
>>>
>>> On Fri, Jan 11, 2019 at 1:33 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> Can you elaborate a bit more? Maybe a specific code example? I'm a
>>>> little bit concerned about this sort of global verification. If the
>>>> PCollection gets passed around afterwards, new restrictions on what can be
>>>> done with it are a pretty big deal.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Jan 11, 2019 at 12:58 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> My problem is exactly outputs. I want to verify schemas for any
>>>>> OutputReceiver parameters, and I don't think I can do this in expand.
>>>>>
>>>>> The best idea I have so far is to create a new PipelineVisitor to do
>>>>> this, and run that after the normal apply is done.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Jan 11, 2019 at 12:39 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I believe that today all coders must be fully defined for all
>>>>>> arguments to expand(). For the outputs, the ParDo outputting should be
>>>>>> agnostic, no? The constraints on setCoder(...) are hoped to be enough to
>>>>>> make sure nothing breaks.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Jan 11, 2019 at 10:41 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I want to be able to write a verification phase that asserts that
>>>>>>> input and output schemas for all ParDos match up properly. The only 
>>>>>>> place I
>>>>>>> can see to do that today is in expand(), however this does not work as
>>>>>>> Coders may not be fully known when expand is called (remember Schemas 
>>>>>>> are
>>>>>>> implemented as a special type of Coder today). For example:
>>>>>>>
>>>>>>> p.apply(ParDo.of(MyDoFn))
>>>>>>>   .SetCoder(FooCoder());
>>>>>>>
>>>>>>> FooCoder is not known yet when expand is called for the ParDo.
>>>>>>>
>>>>>>> Is there any place in Beam today where I could set up such a
>>>>>>> verification pass?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>

Reply via email to