That clarifies a lot. One take is that a ParDo doesn't output to an existing PCollection. A ParDo returns a PCollection, which it creates and can put special conditions on. Our Java code might make this a pain but that's conceptually what is going on. So perhaps the restriction is that if a ParDo has an OutputReceiver<Row> it should output a PCollection where setCoder(...) will reject types that don't have a (compatible) schema.
Kenn On Fri, Jan 11, 2019 at 2:00 PM Reuven Lax <[email protected]> wrote: > The verification is on the ParDo (or specifically the ParDo's place in the > graph), not on the PCollection. > > If you remember from the original schema discussion on this list, part of > the design was to allow Rows and user types to be used interchangeably when > schemas are around. For example, if you have a PCollection<UserType> pc, > it's legal to write; > > pc.apply(ParDo.of(new DoFn.... > @ProcessElement > public void process(@Element Row row) { > } > } > > Practically our goal was to make schemas an integral part of the type > system, and not force users to write ParDos to convert to PCollection<Row> > whenever they want to use schema-based transforms; this seamless usability > is one thing that distinguishes our work on schemas from that of some other > systems. It also enables us to do things efficiently, as otherwise the cost > of constant conversions back and forth to Row would start to dominate. > > Of course this is only legal if the input PCollection has a schema, so we > verify that (today in expand()). The problem I'm dealing with is that we > also allow the same for OutputReceiver (it would somewhat defeat the > purpose to do this only on input and not on output). So you can have > something like: > > @ProcessElement > public void process(...., OutputReceiver<Row> output) { > } > > The problem I face today is that this is only truly legal if the > PCollection that the ParDo is outputting to has a schema, but we don't know > that until coder inference is done. Right now this means that if a mistake > is made, it's not caught until the ParDo is actually running - an > unfortunate situation. I would like to be able to detect these > OutputReceiver parameters and verify that they are correct before the > pipeline begins to run. > > Reuven > > On Fri, Jan 11, 2019 at 1:33 PM Kenneth Knowles <[email protected]> wrote: > >> Can you elaborate a bit more? Maybe a specific code example? I'm a little >> bit concerned about this sort of global verification. If the PCollection >> gets passed around afterwards, new restrictions on what can be done with it >> are a pretty big deal. >> >> Kenn >> >> On Fri, Jan 11, 2019 at 12:58 PM Reuven Lax <[email protected]> wrote: >> >>> My problem is exactly outputs. I want to verify schemas for any >>> OutputReceiver parameters, and I don't think I can do this in expand. >>> >>> The best idea I have so far is to create a new PipelineVisitor to do >>> this, and run that after the normal apply is done. >>> >>> Reuven >>> >>> On Fri, Jan 11, 2019 at 12:39 PM Kenneth Knowles <[email protected]> >>> wrote: >>> >>>> I believe that today all coders must be fully defined for all arguments >>>> to expand(). For the outputs, the ParDo outputting should be agnostic, no? >>>> The constraints on setCoder(...) are hoped to be enough to make sure >>>> nothing breaks. >>>> >>>> Kenn >>>> >>>> On Fri, Jan 11, 2019 at 10:41 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I want to be able to write a verification phase that asserts that >>>>> input and output schemas for all ParDos match up properly. The only place >>>>> I >>>>> can see to do that today is in expand(), however this does not work as >>>>> Coders may not be fully known when expand is called (remember Schemas are >>>>> implemented as a special type of Coder today). For example: >>>>> >>>>> p.apply(ParDo.of(MyDoFn)) >>>>> .SetCoder(FooCoder()); >>>>> >>>>> FooCoder is not known yet when expand is called for the ParDo. >>>>> >>>>> Is there any place in Beam today where I could set up such a >>>>> verification pass? >>>>> >>>>> Reuven >>>>> >>>>
