On Thu, Apr 25, 2019 at 6:04 AM jincheng sun <sunjincheng...@gmail.com> wrote:
>
> Hi Robert,
>
> In addition to the questions described by Dian, I also want to know what 
> difficult problems Py4j's solution will encounter in add UDF support, which 
> you mentioned as follows:
>
>> Using something like Py4j is an easy way to get up an running, especially 
>> for a very faithful API, but the instant one wants to add UDFs one hits a 
>> cliff of sorts (which is surmountable, but likely a lot harder than having 
>> gone the above approach).
>
> I appreciate if you can share more specific cases?

The orchestration involved in supporting UDFs is non-trivial. I think
it is true that a lot of effort can be saved by re-using significant
portions of the design, concepts, and even implementation we already
have for Beam, but still re-building it out of the individual pieces
(likely necessitated due to Py4j having hooked in at lower than the
DAG level) is likely harder (initially and on-going) than simply
leveraging the complete, working package.

> Dian Fu <dian0511...@gmail.com> 于2019年4月25日周四 上午11:53写道:
>>
>> Thanks everyone for the discussion here.
>>
>> Regarding to the Java/Scala UDF and the built-in UDF to execute in the 
>> current Flink way (directly in JVM, not via RPC), I share the same thoughts 
>> with Max and Robert and I think it will not be a big problem. From the 
>> design doc, I guess the main reason to take the Py4J way instead of the DAG 
>> way at present is that DAG has some limitations in some scenarios such as 
>> interactive programing which may be a strong requirement for data scientist.
>>
>> > In addition (and I'll admit this is rather subjective) it seems to me one 
>> > of the primary values of a table-like API in a given language (vs. just 
>> > using (say) plain old SQL itself via a console) is the ability to embed it 
>> > in a larger pipeline, or at least drop in operations that are not (as) 
>> > naturally expressed in the "table way," including existing libraries. In 
>> > other words, a full SDK. The Py4j wrapping doesn't extend itself to such 
>> > integration nearly as easily.
>>
>>
>> Hi Robert, regarding to "a larger pipeline", do you mean translating a 
>> table-like API jobs from/to another kind of API job or embedding third-part 
>> libraries into a table-like API jobs via UDF? Could you kindly explain why 
>> this would be a problem for Py4J and will not be a problem if expressing the 
>> job with DAG?
>>
>> Thanks,
>> Dian
>>
>>
>> > 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道:
>> >
>> > Thanks for the meeting summary, Stephan. Sound like you covered a lot of 
>> > ground. Some more comments below, adding onto what Max has said.
>> >
>> > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org 
>> > <mailto:m...@apache.org>> wrote:
>> > >
>> > > Hi Stephan,
>> > >
>> > > This is excited! Thanks for sharing. The inter-process communication
>> > > code looks like the most natural choice as a common ground. To go
>> > > further, there are indeed some challenges to solve.
>> >
>> > It certainly does make sense to share this work, though it does to me seem 
>> > like a rather low level to integrate at.
>> >
>> > > > => Biggest question is whether the language-independent DAG is 
>> > > > expressive enough to capture all the expressions that we want to map 
>> > > > directly to Table API expressions. Currently much is hidden in opaque 
>> > > > UDFs. Kenn mentioned the structure should be flexible enough to 
>> > > > capture more expressions transparently.
>> > >
>> > > Just to add some context how this could be done, there is the concept of
>> > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
>> > > contains a URN and with a payload. FunctionSpec can be either (1)
>> > > translated by the Runner directly, e.g. map to table API concepts or (2)
>> > > run a user-defined function with an Environment. It could be feasible
>> > > for Flink to choose the direct path, whereas Beam Runners would leverage
>> > > the more generic approach using UDFs. Granted, compatibility across
>> > > Flink and Beam would only work if both of the translation paths yielded
>> > > the same semantics.
>> >
>> > To elaborate a bit on this, Beam DAGs are built up by applying Transforms 
>> > (basically operations) to PColections (the equivalent of 
>> > dataset/datastream), but the key point here is that these transforms are 
>> > often composite operations that expand out into smaller subtransforms. 
>> > This expansion happens during pipeline construction, but with the recent 
>> > work on cross language pipelines can happen out of process. This is one 
>> > point of extendability. Secondly, and importantly, this composite 
>> > structure is preserved in the DAG, and so a runner is free to ignore the 
>> > provided expansion and supply its own (so long as semantically it produces 
>> > exactly the same output). These composite operations can be identified by 
>> > arbitrary URNs + payloads, and any runner that does not understand them 
>> > simply uses the pre-provided expansion.
>> >
>> > The existing Flink runner operates on exactly this principle, translating 
>> > URNs for the leaf operations (Map, Flatten, ...) as well as some 
>> > composites it can do better (e.g. Reshard). It is intentionally easy to 
>> > define and add new ones. This actually seems the easier approach (to me at 
>> > least, but that's probably heavily influenced by what I'm familiar with 
>> > vs. what I'm not).
>> >
>> > As for how well this maps onto the Flink Tables API, part of that depends 
>> > on how much of the API is the operations themselves, and how much is 
>> > concerning configuration/environment/etc. which is harder to talk about in 
>> > an agnostic way.
>> >
>> > Using something like Py4j is an easy way to get up an running, especially 
>> > for a very faithful API, but the instant one wants to add UDFs one hits a 
>> > cliff of sorts (which is surmountable, but likely a lot harder than having 
>> > gone the above approach). In addition (and I'll admit this is rather 
>> > subjective) it seems to me one of the primary values of a table-like API 
>> > in a given language (vs. just using (say) plain old SQL itself via a 
>> > console) is the ability to embed it in a larger pipeline, or at least drop 
>> > in operations that are not (as) naturally expressed in the "table way," 
>> > including existing libraries. In other words, a full SDK. The Py4j 
>> > wrapping doesn't extend itself to such integration nearly as easily.
>> >
>> > But I really do understand the desire to not block immediate work (and 
>> > value) for a longer term solution.
>> >
>> > > >  If the DAG is generic enough to capture the additional information, 
>> > > > we probably still need some standardization, so that all the different 
>> > > > language APIs represent their expressions the same way
>> > >
>> > > I wonder whether that's necessary as a first step. I think it would be
>> > > fine for Flink to have its own way to represent API concepts in the Beam
>> > > DAG which Beam Runners may not be able to understand. We could then
>> > > successively add the capability for these transforms to run with Beam.
>> > >
>> > > >  Similarly, it makes sense to standardize the type system (and type 
>> > > > inference) as far as built-in expressions and their interaction with 
>> > > > UDFs are concerned. The Flink Table API and Blink teams found this to 
>> > > > be essential for a consistent API behavior. This would not prevent 
>> > > > all-UDF programs from still using purely binary/opaque types.
>> > >
>> > > Beam has a set of standard coders which can be used across languages. We
>> > > will have to expand those to play well with Flink's:
>> > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
>> > >  
>> > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types>
>> > >
>> > > I think we will need to exchange more ideas to work out a model that
>> > > will work for both Flink and Beam. A regular meeting could be helpful.
>> >
>> > +1, I think this would be really good for both this effort and general 
>> > collaboration between the Beam and Flink communities.
>> >
>> > > Thanks,
>> > > Max
>> > >
>> > > On 23.04.19 21:23, Stephan Ewen wrote:
>> > > > Hi all!
>> > > >
>> > > > Below are my notes on the discussion last week on how to collaborate
>> > > > between Beam and Flink.
>> > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
>> > > > Jincheng, and me.
>> > > >
>> > > > This represents my understanding of the discussion, please augment this
>> > > > where I missed something or where your conclusion was different.
>> > > >
>> > > > Best,
>> > > > Stephan
>> > > >
>> > > > =======================================================
>> > > >
>> > > > *Beams Python and Portability Framework*
>> > > >
>> > > >    - Portability core to Beam
>> > > >    - Language independent dataflow DAG that is defined via ProtoBuf
>> > > >    - DAG can be generated from various languages (Java, Python, Go)
>> > > >    - The DAG describes the pipelines and contains additional parameters
>> > > > to describe each operator, and contains artifacts that need to be
>> > > > deployed / executed as part of an operator execution.
>> > > >    - Operators execute in language-specific containers, data is
>> > > > exchanged between the language-specific container and the runner
>> > > > container (JVM) via gRPC.
>> > > >
>> > > > *Flink's desiderata for Python API*
>> > > >
>> > > >    - Python API should mirror Java / Scala Table API
>> > > >    - All relational expressions that correspond to built-in functions
>> > > > should be translated to corresponding expressions in the Table API. 
>> > > > That
>> > > > way the planner generated Java code for the data types and built-in
>> > > > expressions, meaning no Python code is necessary during execution
>> > > >    - UDFs should be supported and run similarly as in Beam's approach
>> > > >    - Python programs should be similarly created and submitted/deployed
>> > > > as Java / Scala programs (CLI, web, containerized, etc.)
>> > > >
>> > > > *Consensus to share inter-process communication code*
>> > > >
>> > > >    - Crucial code for robust setup and high performance data exchange
>> > > > across processes
>> > > >    - The code for the SDK harness, the artifact boostrapping, and the
>> > > > data exchange make sense to share.
>> > > >    - Ongoing discussion whether this can be a dedicated module with 
>> > > > slim
>> > > > dependencies in Beam
>> > > >
>> > > > *Potential Long Term Perspective: Share language-independent DAG
>> > > > representation*
>> > > >
>> > > >    - Beam's language independent DAG could become a standard
>> > > > representation used in both projects
>> > > >    - Flink would need an way to receive that DAG, map it to the Table
>> > > > API, execute it from there
>> > > >    - The DAG would need to have a standardized representation of
>> > > > functions and expressions that then get mapped to Table API expressions
>> > > > to let the planner optimize those and generate Java code for those
>> > > >    - Similar as UDFs are supported in the Table API, there would be
>> > > > additional "external UDFs" that would go through the above mentioned
>> > > > inter-process communication layer
>> > > >
>> > > >    - _Advantages:_
>> > > >      => Flink and Beam could share more language bindings
>> > > >      => Flink would execute Beam portability programs fast, without
>> > > > intermediate abstraction and directly in the JVM for many operators.
>> > > >           Abstraction is necessary around UDFs and to bridge between
>> > > > serializers / coders, etc.
>> > > >
>> > > >    - _Open issues:_
>> > > >      => Biggest question is whether the language-independent DAG is
>> > > > expressive enough to capture all the expressions that we want to map
>> > > > directly to Table API expressions. Currently much is hidden in opaque
>> > > > UDFs. Kenn mentioned the structure should be flexible enough to capture
>> > > > more expressions transparently.
>> > > >
>> > > >      => If the DAG is generic enough to capture the additional
>> > > > information, we probably still need some standardization, so that all
>> > > > the different language APIs represent their expressions the same way
>> > > >      => Similarly, it makes sense to standardize the type system (and
>> > > > type inference) as far as built-in expressions and their interaction
>> > > > with UDFs are concerned. The Flink Table API and Blink teams found this
>> > > > to be essential for a consistent API behavior. This would not prevent
>> > > > all-UDF programs from still using purely binary/opaque types.
>> > > >
>> > > >   =>  We need to create a Python API that follows the same structure as
>> > > > Flink's Table API that produces the language-independent DAG
>> > > >
>> > > > *Short-term approach in Flink*
>> > > >
>> > > >    - Goal is to not block Flink's Python effort on the long term
>> > > > approach and the necessary design and evolution of the
>> > > > language-independent DAG.
>> > > >    - Depending on what the outcome of above investigation is, Flink may
>> > > > initially go with a simple approach to map the Python Table API to the
>> > > > the Java Table API via Py4J, as outlined in FLIP-38:
>> > > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
>> > > >  
>> > > > <https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8>

Reply via email to