On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <[email protected]> wrote:
> > > On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <[email protected]> wrote: > >> In particular, ideally the Dataflow Service is handling the Dataflow >> specific format translation, rather than each SDK. Move the v1 beta3 >> pipeline to an internal detail. >> >> Ideally Dataflow would support a JobManagment endpoint directly, but I >> imagine that's a more involved task that's out of scope for now. >> > > Yeah, I think we can just embed the runner API proto in Dataflow job > request (or store it in GCS and Download in router if too large). Then > runner API proto to Dataflow proto translation can occur within Dataflow > service and all SDKs can share that translation logic ((3) below). I agree > that fully migrating Dataflow service to be on job management API seems to > be out of scope. > I've been hoping for that day for a long time now :). I wonder how hard it woud be to extend/embed the existing go translation code into the router. > > >> >> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]> >> wrote: >> >>> >>> >>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote: >>> >>>> Okay cool, so it sounds like the cleanup can be done in two phases: >>>> move the apply_ methods to transform replacements, then move Dataflow onto >>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object >>>> portable? If the InteractiveRunner were to make a Pipeline object, then it >>>> could be passed to the DataflowRunner to run, correct? >>>> >>> >>> Currently we do the following. >>> >>> (1) Currently Java and Python SDKs >>> SDK specific object representation -> Dataflow job request (v1beta3) -> >>> Dataflow service specific representation >>> Beam Runner API proto -> store in GCS -> Download in workers. >>> >>> (2) Currently Go SDK >>> SDK specific object representation -> Beam Runner API proto -> Dataflow >>> job request (v1beta3) -> Dataflow service specific representation >>> >>> We got cross-language (for Python) working for (1) above but code will >>> be much cleaner if we could do (2) for Python and Java >>> >>> I think the cleanest approach is following which will allow us to share >>> translation code across SDKs. >>> (3) For all SDKs >>> SDK specific object representation -> Runner API proto embedded in >>> Dataflow job request -> Runner API proto to internal Dataflow specific >>> representation within Dataflow service >>> >>> I think we should go for a cleaner approach here ((2) or (3)) instead of >>> trying to do it in multiple steps (we'll have to keep updating features >>> such as a cross-language to be in lockstep which will be hard and result in >>> a lot of throwaway work). >>> >>> Thanks, >>> Cham >>> >>> >>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> >>>> wrote: >>>> >>>>> +1 to translation from beam pipeline Protos. >>>>> >>>>> The Go SDK does that currently in dataflowlib/translate.go to handle >>>>> the current Dataflow situation, so it's certainly doable. >>>>> >>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I am currently investigating making the Python DataflowRunner to use >>>>>>> a portable pipeline representation so that we can eventually get rid of >>>>>>> the >>>>>>> Pipeline(runner) weirdness. >>>>>>> >>>>>>> In that case, I have a lot questions about the Python DataflowRunner: >>>>>>> >>>>>>> *PValueCache* >>>>>>> >>>>>>> - Why does this exist? >>>>>>> >>>>>>> This is historical baggage from the (long gone) first direct runner >>>>>> when actual computed PCollections were cached, and the DataflowRunner >>>>>> inherited it. >>>>>> >>>>>> >>>>>>> *DataflowRunner* >>>>>>> >>>>>>> - I see that the DataflowRunner defines some PTransforms as >>>>>>> runner-specific primitives by returning a PCollection.from_(...) in >>>>>>> apply_ >>>>>>> methods. Then in the run_ methods, it references the PValueCache to >>>>>>> add >>>>>>> steps. >>>>>>> - How does this add steps? >>>>>>> - Where does it cache the values to? >>>>>>> - How does the runner harness pick up these cached values to >>>>>>> create new steps? >>>>>>> - How is this information communicated to the runner harness? >>>>>>> - Why do the following transforms need to be overridden: >>>>>>> GroupByKey, WriteToBigQuery, CombineValues, Read? >>>>>>> >>>>>>> Each of these four has a different implementation on Dataflow. >>>>>> >>>>>>> >>>>>>> - Why doesn't the ParDo transform need to be overridden? I see >>>>>>> that it has a run_ method but no apply_ method. >>>>>>> >>>>>>> apply_ is called at pipeline construction time, all of these should >>>>>> be replaced by PTransformOverrides. run_ is called after pipeline >>>>>> construction to actually build up the dataflow graph. >>>>>> >>>>>> >>>>>>> *Possible fixes* >>>>>>> I was thinking of getting rid of the apply_ and run_ methods and >>>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor, >>>>>>> respectively. Is this feasible? Am I missing any assumptions that don't >>>>>>> make this feasible? >>>>>>> >>>>>> >>>>>> If we're going to overhaul how the runner works, it would be best to >>>>>> make DataflowRunner direct a translator from Beam runner api protos to >>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python >>>>>> representation (which no one wants to change for fear of messing up >>>>>> DataflowRunner and cause headaches for cross langauge). >>>>>> >>>>>> >>>>>>
