Hi Stephan,

This is excited! Thanks for sharing. The inter-process communication code looks like the most natural choice as a common ground. To go further, there are indeed some challenges to solve.

=> Biggest question is whether the language-independent DAG is expressive 
enough to capture all the expressions that we want to map directly to Table API 
expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure 
should be flexible enough to capture more expressions transparently.

Just to add some context how this could be done, there is the concept of a FunctionSpec which is part of a transform in the DAG. FunctionSpec contains a URN and with a payload. FunctionSpec can be either (1) translated by the Runner directly, e.g. map to table API concepts or (2) run a user-defined function with an Environment. It could be feasible for Flink to choose the direct path, whereas Beam Runners would leverage the more generic approach using UDFs. Granted, compatibility across Flink and Beam would only work if both of the translation paths yielded the same semantics.

 If the DAG is generic enough to capture the additional information, we 
probably still need some standardization, so that all the different language 
APIs represent their expressions the same way

I wonder whether that's necessary as a first step. I think it would be fine for Flink to have its own way to represent API concepts in the Beam DAG which Beam Runners may not be able to understand. We could then successively add the capability for these transforms to run with Beam.

 Similarly, it makes sense to standardize the type system (and type inference) 
as far as built-in expressions and their interaction with UDFs are concerned. 
The Flink Table API and Blink teams found this to be essential for a consistent 
API behavior. This would not prevent all-UDF programs from still using purely 
binary/opaque types.

Beam has a set of standard coders which can be used across languages. We will have to expand those to play well with Flink's: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types

I think we will need to exchange more ideas to work out a model that will work for both Flink and Beam. A regular meeting could be helpful.

Thanks,
Max

On 23.04.19 21:23, Stephan Ewen wrote:
Hi all!

Below are my notes on the discussion last week on how to collaborate between Beam and Flink. The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan, Jincheng, and me.

This represents my understanding of the discussion, please augment this where I missed something or where your conclusion was different.

Best,
Stephan

=======================================================

*Beams Python and Portability Framework*

   - Portability core to Beam
   - Language independent dataflow DAG that is defined via ProtoBuf
   - DAG can be generated from various languages (Java, Python, Go)
  - The DAG describes the pipelines and contains additional parameters to describe each operator, and contains artifacts that need to be deployed / executed as part of an operator execution.   - Operators execute in language-specific containers, data is exchanged between the language-specific container and the runner container (JVM) via gRPC.

*Flink's desiderata for Python API*

   - Python API should mirror Java / Scala Table API
  - All relational expressions that correspond to built-in functions should be translated to corresponding expressions in the Table API. That way the planner generated Java code for the data types and built-in expressions, meaning no Python code is necessary during execution
   - UDFs should be supported and run similarly as in Beam's approach
  - Python programs should be similarly created and submitted/deployed as Java / Scala programs (CLI, web, containerized, etc.)

*Consensus to share inter-process communication code*

  - Crucial code for robust setup and high performance data exchange across processes   - The code for the SDK harness, the artifact boostrapping, and the data exchange make sense to share.   - Ongoing discussion whether this can be a dedicated module with slim dependencies in Beam

*Potential Long Term Perspective: Share language-independent DAG representation*

  - Beam's language independent DAG could become a standard representation used in both projects   - Flink would need an way to receive that DAG, map it to the Table API, execute it from there   - The DAG would need to have a standardized representation of functions and expressions that then get mapped to Table API expressions to let the planner optimize those and generate Java code for those   - Similar as UDFs are supported in the Table API, there would be additional "external UDFs" that would go through the above mentioned inter-process communication layer

   - _Advantages:_
     => Flink and Beam could share more language bindings
    => Flink would execute Beam portability programs fast, without intermediate abstraction and directly in the JVM for many operators.          Abstraction is necessary around UDFs and to bridge between serializers / coders, etc.

   - _Open issues:_
    => Biggest question is whether the language-independent DAG is expressive enough to capture all the expressions that we want to map directly to Table API expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure should be flexible enough to capture more expressions transparently.

    => If the DAG is generic enough to capture the additional information, we probably still need some standardization, so that all the different language APIs represent their expressions the same way     => Similarly, it makes sense to standardize the type system (and type inference) as far as built-in expressions and their interaction with UDFs are concerned. The Flink Table API and Blink teams found this to be essential for a consistent API behavior. This would not prevent all-UDF programs from still using purely binary/opaque types.

 =>  We need to create a Python API that follows the same structure as Flink's Table API that produces the language-independent DAG

*Short-term approach in Flink*

  - Goal is to not block Flink's Python effort on the long term approach and the necessary design and evolution of the language-independent DAG.   - Depending on what the outcome of above investigation is, Flink may initially go with a simple approach to map the Python Table API to the the Java Table API via Py4J, as outlined in FLIP-38: https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8



On Tue, Apr 23, 2019 at 4:14 AM jincheng sun <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote:

    Hi everyone,

    Thank you for all of your feedback and comments in google doc!

    I have updated the google doc and add the UDFs part. For a short
    summary:

       - Python TableAPI - Flink introduces a set of Python Table API
    Interfaces
    which align with Flink Java Table API. It uses Py4j framework to
    communicate between Python VM  and Java VM.
       - Python User-defined functions - IMO. Flink supports the
    communication
    framework of UDFs, we will try to reuse the existing achievements of
    Beam
    as much as possible, and do our best for this. The first step is
           to solve the above interface definition problem, which turns `
    WindowedValue<T>` into `T` in the FnDataService and BeamFnDataClient
    interface definition, has been discussed in the Beam community.

    The detail can be fonded here:
    
https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing

    So we can start the development of Table API without UDFs in Flink, and
    work with the Beam community to promote the abstraction of Beam.

    What do you think?

    Regards,
    Jincheng

    jincheng sun <sunjincheng...@gmail.com
    <mailto:sunjincheng...@gmail.com>> 于2019年4月17日周三 下午4:01写道:

     > Hi Stephan,
     >
     > Thanks for your suggestion and summarize. :)
     >
     >      ==> The FLIP should probably reflect the full goal rather
    than the
     >> first implementation step only, this would make sure everyone
    understands
     >> what the final goal of the effort is.
     >
     >
     > I totally agree that we can implement the function in stages, but
    FLIP
     > needs to reflect the full final goal. I agree with Thomas and
    you,  I will
     > add the design of the UDF part later.
     >
     > Yes, you are right, currently, we only consider the `flink run` and
     > `python-shell` as the job entry point. and we should add REST API for
     > another entry point.
     >
     > It would be super cool if the Python API would work seamlessly
    with all
     >> modes of starting Flink jobs.
     >
     >
     > If my understand you correctly, support Python TableAPI in
    Kubernetes, we
     > only need to increase (or improve the existing) REST API
    corresponding to
     > the Python Table API, of course, it also may need to release
    Docker Image
     > that supports Python, it will easily deploy Python TableAPI into
     > Kubernetes.
     >
     > So, Finally, we support the following ways to submit Python TableAPI:
     > - Python Shell - interactive development.
     > - CLI - submit the job by `flink run`. e.g: deploy job into the yarn
     > cluster.
     > - REST - submit the job by REST API. e.g: deploy job into the
    kubernetes
     > cluster.
     >
     > Please correct me if there are any incorrect understanding.
     >
     > Thanks,
     > Jincheng
     >
     >
     > Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> 于2019
    年4月12日周五 上午12:22写道:
     >
     >> One more thought:
     >>
     >> The FLIP is very much centered on the CLI and it looks like it
    has mainly
     >> batch jobs and session clusters in mind.
     >>
     >> In very many cases, especially in streaming cases, the CLI (or
    shell) is
     >> not the entry point for a program.
     >> See for example the use of Flink jobs on Kubernetes (Container
    Mode /
     >> Entrypoint).
     >>
     >> It would be super cool if the Python API would work seamlessly
    with all
     >> modes of starting Flink jobs.
     >> That would make i available to all users.
     >>
     >> On Thu, Apr 11, 2019 at 5:34 PM Stephan Ewen <se...@apache.org
    <mailto:se...@apache.org>> wrote:
     >>
     >> > Hi all!
     >> >
     >> > I think that all the opinions and ideas are not actually in
    conflict, so
     >> > let me summarize what I understand is the proposal:
     >> >
     >> > *(1) Long-term goal: Full Python Table API with UDFs*
     >> >
     >> >      To break the implementation effort up into stages, the
    first step
     >> > would be the API without UDFs.
     >> >       Because of all the built-in functions in the Table API,
    this can
     >> > already exist by itself, with some value, but ultimately is quite
     >> limited
     >> > without UDF support.
     >> >
     >> >      ==> The FLIP should probably reflect the full goal rather
    than the
     >> > first implementation step only, this would make sure everyone
     >> understands
     >> > what the final goal of the effort is.
     >> >
     >> >
     >> > *(2) Relationship to Beam Language Portability*
     >> >
     >> > Flink's own Python Table API and Beam-Python on Flink add
    different
     >> value
     >> > and are both attractive for different scenarios.
     >> >
     >> >   - Beam's Python API supports complex pipelines in a similar
    style as
     >> the
     >> > DataStream API. There is also the ecosystem of libraries built
    on top
     >> that
     >> > DSL, for example for machine learning.
     >> >
     >> >   - Flink's Python Table API builds mostly relational
    expressions, plus
     >> > some UDFs. Most of the Python code never executes in Python,
    though. It
     >> is
     >> > geared at use cases similar to Flink's Table API.
     >> >
     >> > Both approaches mainly differ in how the streaming DAG is
    built from
     >> > Python code and received by the JVM.
     >> >
     >> > In previous discussions, we concluded that for inter process data
     >> exchange
     >> > (JVM <> Python), we want to share code with Beam.
     >> > That part is possibly the most crucial piece to getting
    performance out
     >> of
     >> > the Python DSL, so will benefit from sharing development,
    optimizations,
     >> > etc.
     >> >
     >> > Best,
     >> > Stephan
     >> >
     >> >
     >> >
     >> >
     >> > On Fri, Apr 5, 2019 at 5:25 PM jincheng sun
    <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>>
     >> > wrote:
     >> >
     >> >> One more thing It's better to mention that Flink table API is a
     >> superset
     >> >> of
     >> >> Flink SQL, such as:
     >> >> - AddColumns/DropColums/RenameColumns, the detail can be found in
     >> Google
     >> >> doc
     >> >> <
     >> >>
     >>
    
https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit#heading=h.7rwcjbvr52dc
     >> >> >
     >> >> - Interactive Programming in Flink Table API, the detail can
    be found
     >> in
     >> >> FLIP-36
     >> >> <
     >> >>
     >>
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
     >> >> >
     >> >> I think In the future, more and more features that cannot be
    expressed
     >> in
     >> >> SQL will be added in Table API.
     >> >>
     >> >> Thomas Weise <thomas.we...@gmail.com
    <mailto:thomas.we...@gmail.com>> 于2019年4月5日周五 下午12:11写道:
     >> >>
     >> >> > Hi Jincheng,
     >> >> >
     >> >> > >
     >> >> > > Yes, we can add use case examples in both google doc and
    FLIP, I
     >> had
     >> >> > > already add the simple usage in the google doc, here I
    want to know
     >> >> which
     >> >> > > kind of examples you want? :)
     >> >> > >
     >> >> >
     >> >> > Do you have use cases where the Python table API can be applied
     >> without
     >> >> UDF
     >> >> > support?
     >> >> >
     >> >> > (And where the same could not be accomplished with just SQL.)
     >> >> >
     >> >> >
     >> >> > > The very short answer to UDF support is Yes. As you said,
    we need
     >> UDF
     >> >> > > support on the Python Table API, including (UDF, UDTF,
    UDAF). This
     >> >> needs
     >> >> > to
     >> >> > > be discussed after basic Python TableAPI supported.
    Because UDF
     >> >> involves
     >> >> > > the management of the python environment, Runtime level
    Java and
     >> >> Runtime
     >> >> > > communication, and UDAF in Flink also involves the
    application of
     >> >> State,
     >> >> > so
     >> >> > > this is a topic that is worth discussing in depth in a
    separate
     >> >> thread.
     >> >> > >
     >> >> >
     >> >> > The current proposal for job submission touches something
    that Beam
     >> >> > portability already had to solve.
     >> >> >
     >> >> > If we think that the Python table API will only be useful
    with UDF
     >> >> support
     >> >> > (question above), then it may be better to discuss the
    first step
     >> with
     >> >> the
     >> >> > final goal in mind. If we find that Beam can be used for
    the UDF part
     >> >> then
     >> >> > approach 1 vs. approach 2 in the doc (for the client side
    language
     >> >> > boundary) may look different.
     >> >> >
     >> >> >
     >> >> > >
     >> >> > > I think that no matter how the Flink and Beam work
    together on the
     >> UDF
     >> >> > > level, it will not affect the current Python API
    (interface), we
     >> can
     >> >> > first
     >> >> > > support the Python API in Flink. Then start the UDX
    (UDF/UDTF/UDAF)
     >> >> > > support.
     >> >> > >
     >> >> > >
     >> >> > I agree that the client side API should not be affected.
     >> >> >
     >> >> >
     >> >> > > And great thanks for your valuable comments in Google
    doc! I will
     >> >> > feedback
     >> >> > > you in the google doc. :)
     >> >> > >
     >> >> > >
     >> >> > > Regards,
     >> >> > > Jincheng
     >> >> > >
     >> >> > > Thomas Weise <t...@apache.org <mailto:t...@apache.org>> 于
    2019年4月4日周四 上午8:03写道:
     >> >> > >
     >> >> > > > Thanks for putting this proposal together.
     >> >> > > >
     >> >> > > > It would be nice, if you could share a few use case
    examples
     >> (maybe
     >> >> add
     >> >> > > > them as section to the FLIP?).
     >> >> > > >
     >> >> > > > The reason I ask: The table API is immensely useful,
    but it isn't
     >> >> clear
     >> >> > > to
     >> >> > > > me what value other language bindings provide without UDF
     >> support.
     >> >> With
     >> >> > > > FLIP-38 it will be possible to write a program in
    Python, but not
     >> >> > execute
     >> >> > > > Python functions. Without UDF support, isn't it possible to
     >> achieve
     >> >> > > roughly
     >> >> > > > the same with plain SQL? In which situation would I use the
     >> Python
     >> >> API?
     >> >> > > >
     >> >> > > > There was related discussion regarding UDF support in
    [1]. If the
     >> >> > > > assumption is that such support will be added later,
    then I would
     >> >> like
     >> >> > to
     >> >> > > > circle back to the question why this cannot be built on
    top of
     >> >> Beam? It
     >> >> > > > would be nice to clarify the bigger goal before
    embarking for the
     >> >> first
     >> >> > > > milestone.
     >> >> > > >
     >> >> > > > I'm going to comment on other things in the doc.
     >> >> > > >
     >> >> > > > [1]
     >> >> > > >
     >> >> > > >
     >> >> > >
     >> >> >
     >> >>
     >>
    
https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E
     >> >> > > >
     >> >> > > > Thomas
     >> >> > > >
     >> >> > > >
     >> >> > > > On Wed, Apr 3, 2019 at 12:35 PM Shuyi Chen
    <suez1...@gmail.com <mailto:suez1...@gmail.com>>
     >> >> wrote:
     >> >> > > >
     >> >> > > > > Thanks a lot for driving the FLIP, jincheng. The
    approach looks
     >> >> > > > > good. Adding multi-lang support sounds a promising
    direction to
     >> >> > expand
     >> >> > > > the
     >> >> > > > > footprint of Flink. Do we have plan for adding Golang
    support?
     >> As
     >> >> > many
     >> >> > > > > backend engineers nowadays are familiar with Go, but
    probably
     >> not
     >> >> > Java
     >> >> > > as
     >> >> > > > > much, adding Golang support would significantly
    reduce their
     >> >> friction
     >> >> > > to
     >> >> > > > > use Flink. Also, do we have a design for multi-lang UDF
     >> support,
     >> >> and
     >> >> > > > what's
     >> >> > > > > timeline for adding DataStream API support? We would
    like to
     >> help
     >> >> and
     >> >> > > > > contribute as well as we do have similar need
    internally at our
     >> >> > > company.
     >> >> > > > > Thanks a lot.
     >> >> > > > >
     >> >> > > > > Shuyi
     >> >> > > > >
     >> >> > > > > On Tue, Apr 2, 2019 at 1:03 AM jincheng sun <
     >> >> > sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>>
     >> >> > > > > wrote:
     >> >> > > > >
     >> >> > > > > > Hi All,
     >> >> > > > > > As Xianda brought up in the previous email, There
    are a large
     >> >> > number
     >> >> > > of
     >> >> > > > > > data analysis users who want flink to support
    Python. At the
     >> >> Flink
     >> >> > > API
     >> >> > > > > > level, we have
    DataStreamAPI/DataSetAPI/TableAPI&SQL, the
     >> Table
     >> >> API
     >> >> > > > will
     >> >> > > > > > become the first-class citizen. Table API is
    declarative and
     >> >> can be
     >> >> > > > > > automatically optimized, which is mentioned in the
    Flink
     >> >> mid-term
     >> >> > > > roadmap
     >> >> > > > > > by Stephan. So we first considering supporting
    Python at the
     >> >> Table
     >> >> > > > level
     >> >> > > > > to
     >> >> > > > > > cater to the current large number of analytics
    users. For
     >> >> further
     >> >> > > > promote
     >> >> > > > > > Python support in flink table level. Dian, Wei and I
     >> discussed
     >> >> > > offline
     >> >> > > > a
     >> >> > > > > > bit and came up with an initial features outline as
    follows:
     >> >> > > > > >
     >> >> > > > > > - Python TableAPI Interface
     >> >> > > > > >   Introduce a set of Python Table API interfaces,
    including
     >> >> > interface
     >> >> > > > > > definitions such as Table, TableEnvironment,
    TableConfig,
     >> etc.
     >> >> > > > > >
     >> >> > > > > > - Implementation Architecture
     >> >> > > > > >   We will offer two alternative architecture
    options, one for
     >> >> pure
     >> >> > > > Python
     >> >> > > > > > language support and one for extended
    multi-language design.
     >> >> > > > > >
     >> >> > > > > > - Job Submission
     >> >> > > > > >   Provide a way that can submit(local/remote)
    Python Table
     >> API
     >> >> > jobs.
     >> >> > > > > >
     >> >> > > > > > - Python Shell
     >> >> > > > > >   Python Shell is to provide an interactive way for
    users to
     >> >> write
     >> >> > > and
     >> >> > > > > > execute flink Python Table API jobs.
     >> >> > > > > >
     >> >> > > > > >
     >> >> > > > > > The design document for FLIP-38 can be found here:
     >> >> > > > > >
     >> >> > > > > >
     >> >> > > > > >
     >> >> > > > >
     >> >> > > >
     >> >> > >
     >> >> >
     >> >>
     >>
    
https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
     >> >> > > > > >
     >> >> > > > > > I am looking forward to your comments and feedback.
     >> >> > > > > >
     >> >> > > > > > Best,
     >> >> > > > > > Jincheng
     >> >> > > > > >
     >> >> > > > >
     >> >> > > >
     >> >> > >
     >> >> >
     >> >>
     >> >
     >>
     >

Reply via email to