Actually, looking at things I think this will be quite simple. Thanks for the suggestion!
On Fri, Jan 11, 2019 at 2:20 PM Reuven Lax <[email protected]> wrote: > > > On Fri, Jan 11, 2019 at 2:14 PM Kenneth Knowles <[email protected]> wrote: > >> That clarifies a lot. >> >> One take is that a ParDo doesn't output to an existing PCollection. A >> ParDo returns a PCollection, which it creates and can put special >> conditions on >> > > Yes, that's why I said we are verifying the ParDo's place in the graph. > > >> . Our Java code might make this a pain but that's conceptually what is >> going on. So perhaps the restriction is that if a ParDo has an >> OutputReceiver<Row> it should output a PCollection where setCoder(...) will >> reject types that don't have a (compatible) schema. >> > > This is an interesting idea, and I like it. Can you suggest how this would > be done? Today ParDo creates it's output as follows. If I wrap that will I > break anything, or should everything just work? > > PCollection.createPrimitiveOutputInternal( > pipeline, windowingStrategy, isBounded, > coders.get(outputTag)).setTypeDescriptor((TypeDescriptor) > outputTag.getTypeDescriptor()) > > > > >> >> Kenn >> >> On Fri, Jan 11, 2019 at 2:00 PM Reuven Lax <[email protected]> wrote: >> >>> The verification is on the ParDo (or specifically the ParDo's place in >>> the graph), not on the PCollection. >>> >>> If you remember from the original schema discussion on this list, part >>> of the design was to allow Rows and user types to be used interchangeably >>> when schemas are around. For example, if you have a PCollection<UserType> >>> pc, it's legal to write; >>> >>> pc.apply(ParDo.of(new DoFn.... >>> @ProcessElement >>> public void process(@Element Row row) { >>> } >>> } >>> >>> Practically our goal was to make schemas an integral part of the type >>> system, and not force users to write ParDos to convert to PCollection<Row> >>> whenever they want to use schema-based transforms; this seamless usability >>> is one thing that distinguishes our work on schemas from that of some other >>> systems. It also enables us to do things efficiently, as otherwise the cost >>> of constant conversions back and forth to Row would start to dominate. >>> >>> Of course this is only legal if the input PCollection has a schema, so >>> we verify that (today in expand()). The problem I'm dealing with is that we >>> also allow the same for OutputReceiver (it would somewhat defeat the >>> purpose to do this only on input and not on output). So you can have >>> something like: >>> >>> @ProcessElement >>> public void process(...., OutputReceiver<Row> output) { >>> } >>> >>> The problem I face today is that this is only truly legal if the >>> PCollection that the ParDo is outputting to has a schema, but we don't know >>> that until coder inference is done. Right now this means that if a mistake >>> is made, it's not caught until the ParDo is actually running - an >>> unfortunate situation. I would like to be able to detect these >>> OutputReceiver parameters and verify that they are correct before the >>> pipeline begins to run. >>> >>> Reuven >>> >>> On Fri, Jan 11, 2019 at 1:33 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> Can you elaborate a bit more? Maybe a specific code example? I'm a >>>> little bit concerned about this sort of global verification. If the >>>> PCollection gets passed around afterwards, new restrictions on what can be >>>> done with it are a pretty big deal. >>>> >>>> Kenn >>>> >>>> On Fri, Jan 11, 2019 at 12:58 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> My problem is exactly outputs. I want to verify schemas for any >>>>> OutputReceiver parameters, and I don't think I can do this in expand. >>>>> >>>>> The best idea I have so far is to create a new PipelineVisitor to do >>>>> this, and run that after the normal apply is done. >>>>> >>>>> Reuven >>>>> >>>>> On Fri, Jan 11, 2019 at 12:39 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> I believe that today all coders must be fully defined for all >>>>>> arguments to expand(). For the outputs, the ParDo outputting should be >>>>>> agnostic, no? The constraints on setCoder(...) are hoped to be enough to >>>>>> make sure nothing breaks. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Fri, Jan 11, 2019 at 10:41 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I want to be able to write a verification phase that asserts that >>>>>>> input and output schemas for all ParDos match up properly. The only >>>>>>> place I >>>>>>> can see to do that today is in expand(), however this does not work as >>>>>>> Coders may not be fully known when expand is called (remember Schemas >>>>>>> are >>>>>>> implemented as a special type of Coder today). For example: >>>>>>> >>>>>>> p.apply(ParDo.of(MyDoFn)) >>>>>>> .SetCoder(FooCoder()); >>>>>>> >>>>>>> FooCoder is not known yet when expand is called for the ParDo. >>>>>>> >>>>>>> Is there any place in Beam today where I could set up such a >>>>>>> verification pass? >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>
