I think making the Dataflow service translate into the pipeline proto directly will be a lot of work.
On Thu, Apr 2, 2020 at 6:03 PM Robert Burke <[email protected]> wrote: > It's stateless translation code and nothing is sourced outside of the beam > pipeline proto, so it should be fairly straightforward code to write and > test. > > One can collect several before and afters of the existing translations and > use them to validate. > There are a few quirks that were previously necessary though to get > Dataflow to work properly for the Go SDK, in particular around DoFns > without outputs, but that's reasonably clear in the translator. > > On Thu, Apr 2, 2020, 5:57 PM Robert Bradshaw <[email protected]> wrote: > >> On Thu, Apr 2, 2020 at 7:54 AM Chamikara Jayalath <[email protected]> >> wrote: >> >>> >>> >>> On Thu, Apr 2, 2020 at 7:48 AM Robert Burke <[email protected]> wrote: >>> >>>> In particular, ideally the Dataflow Service is handling the Dataflow >>>> specific format translation, rather than each SDK. Move the v1 beta3 >>>> pipeline to an internal detail. >>>> >>>> Ideally Dataflow would support a JobManagment endpoint directly, but I >>>> imagine that's a more involved task that's out of scope for now. >>>> >>> >>> Yeah, I think we can just embed the runner API proto in Dataflow job >>> request (or store it in GCS and Download in router if too large). Then >>> runner API proto to Dataflow proto translation can occur within Dataflow >>> service and all SDKs can share that translation logic ((3) below). I agree >>> that fully migrating Dataflow service to be on job management API seems to >>> be out of scope. >>> >> >> I've been hoping for that day for a long time now :). I wonder how hard >> it woud be to extend/embed the existing go translation code into the >> router. >> >> >>> >>> >>>> >>>> On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote: >>>>> >>>>>> Okay cool, so it sounds like the cleanup can be done in two phases: >>>>>> move the apply_ methods to transform replacements, then move Dataflow >>>>>> onto >>>>>> the Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline >>>>>> object >>>>>> portable? If the InteractiveRunner were to make a Pipeline object, then >>>>>> it >>>>>> could be passed to the DataflowRunner to run, correct? >>>>>> >>>>> >>>>> Currently we do the following. >>>>> >>>>> (1) Currently Java and Python SDKs >>>>> SDK specific object representation -> Dataflow job request (v1beta3) >>>>> -> Dataflow service specific representation >>>>> Beam Runner API proto -> store in GCS -> Download in workers. >>>>> >>>>> (2) Currently Go SDK >>>>> SDK specific object representation -> Beam Runner API proto >>>>> -> Dataflow job request (v1beta3) -> Dataflow service specific >>>>> representation >>>>> >>>>> We got cross-language (for Python) working for (1) above but code will >>>>> be much cleaner if we could do (2) for Python and Java >>>>> >>>>> I think the cleanest approach is following which will allow us to >>>>> share translation code across SDKs. >>>>> (3) For all SDKs >>>>> SDK specific object representation -> Runner API proto embedded in >>>>> Dataflow job request -> Runner API proto to internal Dataflow specific >>>>> representation within Dataflow service >>>>> >>>>> I think we should go for a cleaner approach here ((2) or (3)) instead >>>>> of trying to do it in multiple steps (we'll have to keep updating features >>>>> such as a cross-language to be in lockstep which will be hard and result >>>>> in >>>>> a lot of throwaway work). >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>>> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> +1 to translation from beam pipeline Protos. >>>>>>> >>>>>>> The Go SDK does that currently in dataflowlib/translate.go to >>>>>>> handle the current Dataflow situation, so it's certainly doable. >>>>>>> >>>>>>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I am currently investigating making the Python DataflowRunner to >>>>>>>>> use a portable pipeline representation so that we can eventually get >>>>>>>>> rid of >>>>>>>>> the Pipeline(runner) weirdness. >>>>>>>>> >>>>>>>>> In that case, I have a lot questions about the Python >>>>>>>>> DataflowRunner: >>>>>>>>> >>>>>>>>> *PValueCache* >>>>>>>>> >>>>>>>>> - Why does this exist? >>>>>>>>> >>>>>>>>> This is historical baggage from the (long gone) first direct >>>>>>>> runner when actual computed PCollections were cached, and the >>>>>>>> DataflowRunner inherited it. >>>>>>>> >>>>>>>> >>>>>>>>> *DataflowRunner* >>>>>>>>> >>>>>>>>> - I see that the DataflowRunner defines some PTransforms as >>>>>>>>> runner-specific primitives by returning a PCollection.from_(...) >>>>>>>>> in apply_ >>>>>>>>> methods. Then in the run_ methods, it references the PValueCache >>>>>>>>> to add >>>>>>>>> steps. >>>>>>>>> - How does this add steps? >>>>>>>>> - Where does it cache the values to? >>>>>>>>> - How does the runner harness pick up these cached values >>>>>>>>> to create new steps? >>>>>>>>> - How is this information communicated to the runner >>>>>>>>> harness? >>>>>>>>> - Why do the following transforms need to be overridden: >>>>>>>>> GroupByKey, WriteToBigQuery, CombineValues, Read? >>>>>>>>> >>>>>>>>> Each of these four has a different implementation on Dataflow. >>>>>>>> >>>>>>>>> >>>>>>>>> - Why doesn't the ParDo transform need to be overridden? I see >>>>>>>>> that it has a run_ method but no apply_ method. >>>>>>>>> >>>>>>>>> apply_ is called at pipeline construction time, all of these >>>>>>>> should be replaced by PTransformOverrides. run_ is called after >>>>>>>> pipeline >>>>>>>> construction to actually build up the dataflow graph. >>>>>>>> >>>>>>>> >>>>>>>>> *Possible fixes* >>>>>>>>> I was thinking of getting rid of the apply_ and run_ methods and >>>>>>>>> replacing those with a PTransformOverride and a simple >>>>>>>>> PipelineVisitor, >>>>>>>>> respectively. Is this feasible? Am I missing any assumptions that >>>>>>>>> don't >>>>>>>>> make this feasible? >>>>>>>>> >>>>>>>> >>>>>>>> If we're going to overhaul how the runner works, it would be best >>>>>>>> to make DataflowRunner direct a translator from Beam runner api protos >>>>>>>> to >>>>>>>> Cloudv1b3 protos, rather than manipulate the intermediate Python >>>>>>>> representation (which no one wants to change for fear of messing up >>>>>>>> DataflowRunner and cause headaches for cross langauge). >>>>>>>> >>>>>>>> >>>>>>>>
