On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <[email protected]> wrote:
> In particular, ideally the Dataflow Service is handling the Dataflow > specific format translation, rather than each SDK. Move the v1 beta3 > pipeline to an internal detail. > > Ideally Dataflow would support a JobManagment endpoint directly, but I > imagine that's a more involved task that's out of scope for now. > Yeah, I think we can just embed the runner API proto in Dataflow job request (or store it in GCS and Download in router if too large). Then runner API proto to Dataflow proto translation can occur within Dataflow service and all SDKs can share that translation logic ((3) below). I agree that fully migrating Dataflow service to be on job management API seems to be out of scope. > > On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote: >> >>> Okay cool, so it sounds like the cleanup can be done in two phases: move >>> the apply_ methods to transform replacements, then move Dataflow onto the >>> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object >>> portable? If the InteractiveRunner were to make a Pipeline object, then it >>> could be passed to the DataflowRunner to run, correct? >>> >> >> Currently we do the following. >> >> (1) Currently Java and Python SDKs >> SDK specific object representation -> Dataflow job request (v1beta3) -> >> Dataflow service specific representation >> Beam Runner API proto -> store in GCS -> Download in workers. >> >> (2) Currently Go SDK >> SDK specific object representation -> Beam Runner API proto -> Dataflow >> job request (v1beta3) -> Dataflow service specific representation >> >> We got cross-language (for Python) working for (1) above but code will be >> much cleaner if we could do (2) for Python and Java >> >> I think the cleanest approach is following which will allow us to share >> translation code across SDKs. >> (3) For all SDKs >> SDK specific object representation -> Runner API proto embedded in >> Dataflow job request -> Runner API proto to internal Dataflow specific >> representation within Dataflow service >> >> I think we should go for a cleaner approach here ((2) or (3)) instead of >> trying to do it in multiple steps (we'll have to keep updating features >> such as a cross-language to be in lockstep which will be hard and result in >> a lot of throwaway work). >> >> Thanks, >> Cham >> >> >>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> wrote: >>> >>>> +1 to translation from beam pipeline Protos. >>>> >>>> The Go SDK does that currently in dataflowlib/translate.go to handle >>>> the current Dataflow situation, so it's certainly doable. >>>> >>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I am currently investigating making the Python DataflowRunner to use >>>>>> a portable pipeline representation so that we can eventually get rid of >>>>>> the >>>>>> Pipeline(runner) weirdness. >>>>>> >>>>>> In that case, I have a lot questions about the Python DataflowRunner: >>>>>> >>>>>> *PValueCache* >>>>>> >>>>>> - Why does this exist? >>>>>> >>>>>> This is historical baggage from the (long gone) first direct runner >>>>> when actual computed PCollections were cached, and the DataflowRunner >>>>> inherited it. >>>>> >>>>> >>>>>> *DataflowRunner* >>>>>> >>>>>> - I see that the DataflowRunner defines some PTransforms as >>>>>> runner-specific primitives by returning a PCollection.from_(...) in >>>>>> apply_ >>>>>> methods. Then in the run_ methods, it references the PValueCache to >>>>>> add >>>>>> steps. >>>>>> - How does this add steps? >>>>>> - Where does it cache the values to? >>>>>> - How does the runner harness pick up these cached values to >>>>>> create new steps? >>>>>> - How is this information communicated to the runner harness? >>>>>> - Why do the following transforms need to be overridden: >>>>>> GroupByKey, WriteToBigQuery, CombineValues, Read? >>>>>> >>>>>> Each of these four has a different implementation on Dataflow. >>>>> >>>>>> >>>>>> - Why doesn't the ParDo transform need to be overridden? I see >>>>>> that it has a run_ method but no apply_ method. >>>>>> >>>>>> apply_ is called at pipeline construction time, all of these should >>>>> be replaced by PTransformOverrides. run_ is called after pipeline >>>>> construction to actually build up the dataflow graph. >>>>> >>>>> >>>>>> *Possible fixes* >>>>>> I was thinking of getting rid of the apply_ and run_ methods and >>>>>> replacing those with a PTransformOverride and a simple PipelineVisitor, >>>>>> respectively. Is this feasible? Am I missing any assumptions that don't >>>>>> make this feasible? >>>>>> >>>>> >>>>> If we're going to overhaul how the runner works, it would be best to >>>>> make DataflowRunner direct a translator from Beam runner api protos to >>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python >>>>> representation (which no one wants to change for fear of messing up >>>>> DataflowRunner and cause headaches for cross langauge). >>>>> >>>>> >>>>>
