On Thu, Apr 25, 2019 at 6:04 AM jincheng sun <sunjincheng...@gmail.com> wrote: > > Hi Robert, > > In addition to the questions described by Dian, I also want to know what > difficult problems Py4j's solution will encounter in add UDF support, which > you mentioned as follows: > >> Using something like Py4j is an easy way to get up an running, especially >> for a very faithful API, but the instant one wants to add UDFs one hits a >> cliff of sorts (which is surmountable, but likely a lot harder than having >> gone the above approach). > > I appreciate if you can share more specific cases?
The orchestration involved in supporting UDFs is non-trivial. I think it is true that a lot of effort can be saved by re-using significant portions of the design, concepts, and even implementation we already have for Beam, but still re-building it out of the individual pieces (likely necessitated due to Py4j having hooked in at lower than the DAG level) is likely harder (initially and on-going) than simply leveraging the complete, working package. > Dian Fu <dian0511...@gmail.com> 于2019年4月25日周四 上午11:53写道: >> >> Thanks everyone for the discussion here. >> >> Regarding to the Java/Scala UDF and the built-in UDF to execute in the >> current Flink way (directly in JVM, not via RPC), I share the same thoughts >> with Max and Robert and I think it will not be a big problem. From the >> design doc, I guess the main reason to take the Py4J way instead of the DAG >> way at present is that DAG has some limitations in some scenarios such as >> interactive programing which may be a strong requirement for data scientist. >> >> > In addition (and I'll admit this is rather subjective) it seems to me one >> > of the primary values of a table-like API in a given language (vs. just >> > using (say) plain old SQL itself via a console) is the ability to embed it >> > in a larger pipeline, or at least drop in operations that are not (as) >> > naturally expressed in the "table way," including existing libraries. In >> > other words, a full SDK. The Py4j wrapping doesn't extend itself to such >> > integration nearly as easily. >> >> >> Hi Robert, regarding to "a larger pipeline", do you mean translating a >> table-like API jobs from/to another kind of API job or embedding third-part >> libraries into a table-like API jobs via UDF? Could you kindly explain why >> this would be a problem for Py4J and will not be a problem if expressing the >> job with DAG? >> >> Thanks, >> Dian >> >> >> > 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道: >> > >> > Thanks for the meeting summary, Stephan. Sound like you covered a lot of >> > ground. Some more comments below, adding onto what Max has said. >> > >> > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org >> > <mailto:m...@apache.org>> wrote: >> > > >> > > Hi Stephan, >> > > >> > > This is excited! Thanks for sharing. The inter-process communication >> > > code looks like the most natural choice as a common ground. To go >> > > further, there are indeed some challenges to solve. >> > >> > It certainly does make sense to share this work, though it does to me seem >> > like a rather low level to integrate at. >> > >> > > > => Biggest question is whether the language-independent DAG is >> > > > expressive enough to capture all the expressions that we want to map >> > > > directly to Table API expressions. Currently much is hidden in opaque >> > > > UDFs. Kenn mentioned the structure should be flexible enough to >> > > > capture more expressions transparently. >> > > >> > > Just to add some context how this could be done, there is the concept of >> > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec >> > > contains a URN and with a payload. FunctionSpec can be either (1) >> > > translated by the Runner directly, e.g. map to table API concepts or (2) >> > > run a user-defined function with an Environment. It could be feasible >> > > for Flink to choose the direct path, whereas Beam Runners would leverage >> > > the more generic approach using UDFs. Granted, compatibility across >> > > Flink and Beam would only work if both of the translation paths yielded >> > > the same semantics. >> > >> > To elaborate a bit on this, Beam DAGs are built up by applying Transforms >> > (basically operations) to PColections (the equivalent of >> > dataset/datastream), but the key point here is that these transforms are >> > often composite operations that expand out into smaller subtransforms. >> > This expansion happens during pipeline construction, but with the recent >> > work on cross language pipelines can happen out of process. This is one >> > point of extendability. Secondly, and importantly, this composite >> > structure is preserved in the DAG, and so a runner is free to ignore the >> > provided expansion and supply its own (so long as semantically it produces >> > exactly the same output). These composite operations can be identified by >> > arbitrary URNs + payloads, and any runner that does not understand them >> > simply uses the pre-provided expansion. >> > >> > The existing Flink runner operates on exactly this principle, translating >> > URNs for the leaf operations (Map, Flatten, ...) as well as some >> > composites it can do better (e.g. Reshard). It is intentionally easy to >> > define and add new ones. This actually seems the easier approach (to me at >> > least, but that's probably heavily influenced by what I'm familiar with >> > vs. what I'm not). >> > >> > As for how well this maps onto the Flink Tables API, part of that depends >> > on how much of the API is the operations themselves, and how much is >> > concerning configuration/environment/etc. which is harder to talk about in >> > an agnostic way. >> > >> > Using something like Py4j is an easy way to get up an running, especially >> > for a very faithful API, but the instant one wants to add UDFs one hits a >> > cliff of sorts (which is surmountable, but likely a lot harder than having >> > gone the above approach). In addition (and I'll admit this is rather >> > subjective) it seems to me one of the primary values of a table-like API >> > in a given language (vs. just using (say) plain old SQL itself via a >> > console) is the ability to embed it in a larger pipeline, or at least drop >> > in operations that are not (as) naturally expressed in the "table way," >> > including existing libraries. In other words, a full SDK. The Py4j >> > wrapping doesn't extend itself to such integration nearly as easily. >> > >> > But I really do understand the desire to not block immediate work (and >> > value) for a longer term solution. >> > >> > > > If the DAG is generic enough to capture the additional information, >> > > > we probably still need some standardization, so that all the different >> > > > language APIs represent their expressions the same way >> > > >> > > I wonder whether that's necessary as a first step. I think it would be >> > > fine for Flink to have its own way to represent API concepts in the Beam >> > > DAG which Beam Runners may not be able to understand. We could then >> > > successively add the capability for these transforms to run with Beam. >> > > >> > > > Similarly, it makes sense to standardize the type system (and type >> > > > inference) as far as built-in expressions and their interaction with >> > > > UDFs are concerned. The Flink Table API and Blink teams found this to >> > > > be essential for a consistent API behavior. This would not prevent >> > > > all-UDF programs from still using purely binary/opaque types. >> > > >> > > Beam has a set of standard coders which can be used across languages. We >> > > will have to expand those to play well with Flink's: >> > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types >> > > >> > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types> >> > > >> > > I think we will need to exchange more ideas to work out a model that >> > > will work for both Flink and Beam. A regular meeting could be helpful. >> > >> > +1, I think this would be really good for both this effort and general >> > collaboration between the Beam and Flink communities. >> > >> > > Thanks, >> > > Max >> > > >> > > On 23.04.19 21:23, Stephan Ewen wrote: >> > > > Hi all! >> > > > >> > > > Below are my notes on the discussion last week on how to collaborate >> > > > between Beam and Flink. >> > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan, >> > > > Jincheng, and me. >> > > > >> > > > This represents my understanding of the discussion, please augment this >> > > > where I missed something or where your conclusion was different. >> > > > >> > > > Best, >> > > > Stephan >> > > > >> > > > ======================================================= >> > > > >> > > > *Beams Python and Portability Framework* >> > > > >> > > > - Portability core to Beam >> > > > - Language independent dataflow DAG that is defined via ProtoBuf >> > > > - DAG can be generated from various languages (Java, Python, Go) >> > > > - The DAG describes the pipelines and contains additional parameters >> > > > to describe each operator, and contains artifacts that need to be >> > > > deployed / executed as part of an operator execution. >> > > > - Operators execute in language-specific containers, data is >> > > > exchanged between the language-specific container and the runner >> > > > container (JVM) via gRPC. >> > > > >> > > > *Flink's desiderata for Python API* >> > > > >> > > > - Python API should mirror Java / Scala Table API >> > > > - All relational expressions that correspond to built-in functions >> > > > should be translated to corresponding expressions in the Table API. >> > > > That >> > > > way the planner generated Java code for the data types and built-in >> > > > expressions, meaning no Python code is necessary during execution >> > > > - UDFs should be supported and run similarly as in Beam's approach >> > > > - Python programs should be similarly created and submitted/deployed >> > > > as Java / Scala programs (CLI, web, containerized, etc.) >> > > > >> > > > *Consensus to share inter-process communication code* >> > > > >> > > > - Crucial code for robust setup and high performance data exchange >> > > > across processes >> > > > - The code for the SDK harness, the artifact boostrapping, and the >> > > > data exchange make sense to share. >> > > > - Ongoing discussion whether this can be a dedicated module with >> > > > slim >> > > > dependencies in Beam >> > > > >> > > > *Potential Long Term Perspective: Share language-independent DAG >> > > > representation* >> > > > >> > > > - Beam's language independent DAG could become a standard >> > > > representation used in both projects >> > > > - Flink would need an way to receive that DAG, map it to the Table >> > > > API, execute it from there >> > > > - The DAG would need to have a standardized representation of >> > > > functions and expressions that then get mapped to Table API expressions >> > > > to let the planner optimize those and generate Java code for those >> > > > - Similar as UDFs are supported in the Table API, there would be >> > > > additional "external UDFs" that would go through the above mentioned >> > > > inter-process communication layer >> > > > >> > > > - _Advantages:_ >> > > > => Flink and Beam could share more language bindings >> > > > => Flink would execute Beam portability programs fast, without >> > > > intermediate abstraction and directly in the JVM for many operators. >> > > > Abstraction is necessary around UDFs and to bridge between >> > > > serializers / coders, etc. >> > > > >> > > > - _Open issues:_ >> > > > => Biggest question is whether the language-independent DAG is >> > > > expressive enough to capture all the expressions that we want to map >> > > > directly to Table API expressions. Currently much is hidden in opaque >> > > > UDFs. Kenn mentioned the structure should be flexible enough to capture >> > > > more expressions transparently. >> > > > >> > > > => If the DAG is generic enough to capture the additional >> > > > information, we probably still need some standardization, so that all >> > > > the different language APIs represent their expressions the same way >> > > > => Similarly, it makes sense to standardize the type system (and >> > > > type inference) as far as built-in expressions and their interaction >> > > > with UDFs are concerned. The Flink Table API and Blink teams found this >> > > > to be essential for a consistent API behavior. This would not prevent >> > > > all-UDF programs from still using purely binary/opaque types. >> > > > >> > > > => We need to create a Python API that follows the same structure as >> > > > Flink's Table API that produces the language-independent DAG >> > > > >> > > > *Short-term approach in Flink* >> > > > >> > > > - Goal is to not block Flink's Python effort on the long term >> > > > approach and the necessary design and evolution of the >> > > > language-independent DAG. >> > > > - Depending on what the outcome of above investigation is, Flink may >> > > > initially go with a simple approach to map the Python Table API to the >> > > > the Java Table API via Py4J, as outlined in FLIP-38: >> > > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 >> > > > >> > > > <https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8>