I agree this has been a major source of pain. The primary cause of issues is the conversion from Beam protos back to the SDK objects (which doesn't always have a good representation, especially for foreign-language components). In my experience, the SDK objects -> Beam Proto conversions aren't generally a problem (except sometimes for objects that were formerly converted from protos).
In my opinion, the solution is to convert to Beam protos and never convert back. (Well, not until we get to the workers, but at that point we can confidently say everything we need to decode does actually belong to the ambient environment.) As mentioned, Dataflow is being fixed, and the only other runner (in Python) that doesn't consume the Beam protos directly is the old direct runner (which Pablo is working on making obsolete, and doesn't support cross language anyway). So finally fixing dataflow should be all we need to do. (Have we seen these issues on other runners?) On the Java side, I think all the optimization stuff works on its SDK representation so care needs to be done to make that conversion faithful or convert that code to act on the protos directly. As for why we went with the current approach, it's simply the fact that SDK representation -> Dataflow v1beta3 predated any of the beam protos stuff, and re-using that code seemed easier than updating the Dataflow service to accept Beam protos (or re-writing it) as we had the SDK representation for everything in hand (until cross-langauge came along that is). On Thu, Jul 2, 2020 at 3:11 AM Heejong Lee <[email protected]> wrote: > > > On Wed, Jul 1, 2020 at 7:18 PM Robert Burke <[email protected]> wrote: > >> From the Go SDK side, it was built that way nearly from the start. >> Historically there was a direct SDK rep -> Dataflow rep conversion, but >> that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep >> conversion. >> >> In particular, this approach had a few benefits: easier to access local >> context for pipeline validation at construction time, to permit as early a >> failure as possible, which might be easier with native language constructs >> vs beam representations of them.(Eg. DoFns not matching ParDo & Collection >> types, and similar) >> Protos are convenient, but impose certain structure on how the pipeline >> graph is handled. (This isn't to say an earlier conversion isn't possible, >> one can do almost anything in code, but it lets the structure be optimised >> for this case.) >> >> The big advantage of translating from Beam proto -> to Dataflow Rep is >> that the Dataflow Rep can get the various unique IDs that are mandated for >> the Beam proto process. >> >> However, the same can't really be said for the other way around. A good >> question is "when should the unique IDs be assigned?" >> > > This is very true and I would like to elaborate more on the source of > friction when using external transforms. As Robert mentioned, pipeline > proto refers to each component by unique IDs and the unique ID is only > assigned when we convert SDK pipeline object to pipeline proto. Before > XLang, pipeline object to pipeline proto conversion happened one time > during the job submission phase. However, after XLang transform was > introduced, it also happens when we request expansion of external > transforms to the expansion service. Unique ID generated for the expansion > request can be embedded in the returning external proto and conflicted > later with other unique IDs generated for the job submission. > > >> >> While I'm not working on adding XLang to the Go SDK directly (that would >> be our wonderful intern, Kevin), I've kind of pictured that the process >> was to provide the Expansion service with unique placeholders if unable to >> provide the right IDs, and substitute them in returned pipeline graph >> segment afterwards, once that is known. That is, we can be relatively >> certain that the expansion service will be self consistent, but it's the >> SDK requesting the expansion's responsibility to ensure they aren't >> colliding with the primary SDKs pipeline ids. >> > > AFAIK, we're already doing this in Java and Python SDKs. Not providing a > "placeholder" but remembering which pipeline object maps to which unique ID > used in the expanded component proto. > > >> >> Otherwise, we could probably recommend a translation protocol (if one >> doesn't exist already, it probably does) and when XLang expansions are to >> happen in the SDK -> beam proto process. So something like Pass 1, intern >> all coders and Pcollections, Pass 2 intern all DoFns and environments, Pass >> 3 expand Xlang, ... Etc. >> > > Not sure I understand correctly but a following transform who consumes the > output of an external transform needs some information like the output > pcollection information from the expanded external transform during the > pipeline construction phase. > > >> The other half of this is when happens when Going from Beam proto a >> -> SDK? This happens during pipeline execution, but at least in the Go >> SDK partly happens when creating the Dataflow rep. In particular, Coder >> reference values only have a populated ID when they've been "rehydrated" >> from the Beam proto, since the Beam Proto is the first place where such IDs >> are correctly assigned. >> >> Tl;dr; i think the right question to sort out is when should IDs be >> expected to be assigned and available during pipeline construction. >> >> On Wed, Jul 1, 2020, 6:34 PM Luke Cwik <[email protected]> wrote: >> >>> It seems like we keep running into translation issues with XLang due to >>> how it is represented in the SDK. (e.g. Brian's work on context map due to >>> loss of coder ids, Heejong's work related to missing environment ids on >>> windowing strategies). >>> >>> I understand that there is an effort that is Dataflow specific where the >>> conversion of the Beam proto -> Dataflow API (v1b3) will help with some >>> issues but it still requires the SDK pipeline representation -> Beam proto >>> to occur correctly which won't be fixed by the Dataflow specific effort. >>> >>> Why did we go with the current approach? >>> >>> What other ways could we do this? >>> >>
