Hi Kenn, I think you are right, the Python SDK harness can be shared to
Flink, and also need to add some new primitive operations. Regarding
runner-side, I think most of the code which in runners/java-fun-
Execution is can be shared(but need some improvement, such as
FnDataService), some of them cannot be shared, such as job submission
code. So, we may need to set up a document to clearly analyze which ones
can be shared, which ones can be shared but need to do some changes, and
which ones are definitely cannot be shared.
Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn
service as a library, also willing to do more efforts for this.
From the view of the current code, abstracting Fn Service into a class
library that other projects can rely on requires a lot of effort from
the Beam community. Turn `WindowedValue<T>` into `T` is just the
beginning of this effort. If the Beam community is willing on
abstracting Fn Service into a class library that can be relied upon by
other projects, I can try to draft a document, of course during this
period may need a lot of help from you, Kenn, Lukasz, and the Beam
community. (I am a recruit in the Beam community :-))
What do you think?
Regards,
Jincheng
Kenneth Knowles <[email protected] <mailto:[email protected]>> 于2019年4月24
日周三 上午3:32写道:
It seems to me that the most valuable code to share and keep up with
is the Python/Go/etc SDK harness; they would need to be enhanced
with new primitive operations. So you would want to depend directly
and share the original proto-generated classes too, which Beam
publishes as separate artifacts for Java. Is the runner-side support
code that valuable for direct integration into Flink? I would expect
once you get past trivial wrappers (that you can copy/paste with no
loss) you would hit differences in architecture so you would diverge
anyhow.
Kenn
On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <[email protected]
<mailto:[email protected]>> wrote:
Hi Jincheng,
Copying code is a solution for the short term. In the long run
I'd like
the Fn services to be a library not only for the Beam
portability layer
but also for other projects which want to leverage it. We should
thus
make an effort to make it more generic/extensible where
necessary and
feasible.
Since you are investigating reuse of Beam portability in the
context of
Flink, do you think it would make sense to setup a document
where we
collect ideas and challenges?
Thanks,
Max
On 23.04.19 13:00, jincheng sun wrote:
> Hi Reuven,
>
> I think you have provided an optional solution for other
community which
> wants to take advantage of Beam's existing
achievements. Thank you very
> much!
>
> I think the Flink community can choose to copy from Beam's
code or
> choose to rely directly on the beam's class library. The
Flink community
> also initiated a discussion, more info can be found here
>
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>
>
> The purpose of Turns `WindowedValue<T>` into `T` is to
promote the
> interface design of Beam more versatile, so that other open
source
> projects have the opportunity to take advantage of Beam's
existing
> achievements. Of course, just changing the `WindowedValue<T>`
into `T`
> is not enough to be shared by other projects in the form of a
class
> library, we need to do more efforts. If Beam can provide a
class library
> in the future, other community contributors will also have the
> willingness to contribute to the beam community. This will
benefit both
> the community that wants to take advantage of Beam's existing
> achievements and the Beam community itself. And thanks to
Thomas for
> that he has also made a lot of efforts in this regard.
>
> Thanks again for your valuable suggestion, and welcome any
feedback!
>
> Best,
> Jincheng
>
> Reuven Lax <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>> 于2019年4月
23日
> 周二 上午1:00写道:
>
> One concern here: these interfaces are intended for use
within the
> Beam project. Beam may decide to make specific changes to
them to
> support needed functionality in Beam. If they are being
reused by
> other projects, then those changes risk breaking those other
> projects in unexpected ways. I don't think we can
guarantee that we
> don't do that. If this is useful in Flink, it would be
safer to copy
> the code IMO rather than to directly depend on it.
>
> On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
> <[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>> wrote:
>
> Hi Kenn,
>
> Thanks for your reply, and explained the design of
WindowValue
> clearly!
>
> At present, the definitions of `FnDataService` and
> `BeamFnDataClient` in Data Plane are very clear and
universal,
> such as: send(...)/receive(...). If it is only
applied in the
> project of Beam, it is already very good. Because
`WindowValue`
> is a very basic data structure in the Beam project,
both the
> Runner and the SDK harness have define the
WindowedValue data
> structure.
>
> The reason I want to change the interface parameter from
> `WindowedValue<T>` to T is because I want to make the
`Data
> Plane` interface into a class library that can be
used by other
> projects (such as Apache Flink), so that other
projects Can have
> its own `FnDataService` implementation. However, the
definition
> of `WindowedValue` does not apply to all projects.
For example,
> Apache Flink also has a definition similar to
WindowedValue. For
> example, Apache Flink Stream has StreamRecord. If we
change
> `WindowedValue<T>` to T, then other project's
implementation
> does not need to wrap WindowedValue, the interface
will become
> more concise. Furthermore, we only need one T, such
as the
> Apache Flink DataSet operator.
>
> So, I agree with your understanding, I don't expect
> `WindowedValueXXX<T>` in the FnDataService interface,
I hope to
> just use a `T`.
>
> Have you seen some problem if we change the interface
parameter
> from `WindowedValue<T>` to T?
>
> Thanks,
> Jincheng
>
> Kenneth Knowles <[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>> 于
> 2019年4月20日周六 上午2:38写道:
>
> WindowedValue has always been an interface, not a
concrete
> representation:
>
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
> It is an abstract class because we started in
Java 7 where
> you could not have default methods, and just due
to legacy
> style concerns. it is not just discussed, but
implemented,
> that there are WindowedValue implementations with
fewer
> allocations.
> At the coder level, it was also always intended
to have
> multiple encodings. We already do have separate
encodings
> based on whether there is 1 window or multiple
windows. The
> coder for a particular kind of WindowedValue
should decide
> this. Before the Fn API none of this had to be
standardized,
> because the runner could just choose whatever it
wants. Now
> we have to standardize any encodings that runners and
> harnesses both need to know. There should be
many, and
> adding more should be just a matter of
standardization, no
> new design.
>
> None of this should be user-facing or in the
runner API /
> pipeline graph - that is critical to making it
flexible on
> the backend between the runner & SDK harness.
>
> If I understand it, from our offline discussion,
you are
> interested in the case where you issue a
> ProcessBundleRequest to the SDK harness and none
of the
> primitives in the subgraph will ever observe the
metadata.
> So you want to not even have a tiny
> "WindowedValueWithNoMetadata". Is that accurate?
>
> Kenn
>
> On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
> <[email protected]
<mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> wrote:
>
> Thank you! And have a nice weekend!
>
>
> Lukasz Cwik <[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>>
> 于2019年4月20日周六 上午1:14写道:
>
> I have added you as a contributor.
>
> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun
> <[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>> wrote:
>
> Hi Lukasz,
>
> Thanks for your affirmation and
provide more
> contextual information. :)
>
> Would you please give me the contributor
> permission? My JIRA ID is
sunjincheng121.
>
> I would like to create/assign tickets
for this work.
>
> Thanks,
> Jincheng
>
> Lukasz Cwik <[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>> 于2019年4月20日周六
> 上午12:26写道:
>
> Since I don't think this is a
contentious
> change.
>
> On Fri, Apr 19, 2019 at 9:25 AM
Lukasz Cwik
> <[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>>
> wrote:
>
> Yes, using T makes sense.
>
> The WindowedValue was meant
to be a
> context object in the SDK
harness that
> propagates various
information about the
> current element. We have
discussed in
> the past about:
> * making optimizations which
would pass
> around less of the context
information
> if we know that the DoFns
don't need it
> (for example, all the values
share the
> same window).
> * versioning the encoding
separately
> from the WindowedValue
context object
> (see recent discussion about
element
> timestamp precision [1])
> * the runner may want its own
> representation of a context
object that
> makes sense for it which isn't a
> WindowedValue necessarily.
>
> Feel free to cut a JIRA about
this and
> start working on a change
towards this.
>
> 1:
>
https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>
> On Fri, Apr 19, 2019 at 3:18
AM jincheng
> sun <[email protected]
<mailto:[email protected]>
>
<mailto:[email protected]
<mailto:[email protected]>>> wrote:
>
> Hi Beam devs,
>
> I read some of the docs about
> `Communicating over the
Fn API` in
> Beam. I feel that Beam
has a very
> good design for Control
Plane/Data
> Plane/State Plane/Logging
services,
> and it is described in
<How to send
> and receive data>
document. When
> communicating between
Runner and SDK
> Harness, the DataPlane
API will be
> WindowedValue(An
immutable triple of
> value, timestamp, and
windows.) As a
> contract object between
Runner and
> SDK Harness. I see the
interface
> definitions for sending and
> receiving data in the
code as follows:
>
> -
>
org.apache.beam.runners.fnexecution.data.FnDataService
>
> public interface
FnDataService {
> <T> InboundDataClient
> receive(LogicalEndpoint
> inputLocation,
>
Coder<WindowedValue<T>> coder,
>
FnDataReceiver<WindowedValue<T>>
> listener);
> <T>
>
CloseableFnDataReceiver<WindowedValue<T>>
> send(
> LogicalEndpoint
> outputLocation,
>
Coder<WindowedValue<T>> coder);
> }
>
>
>
> -
>
org.apache.beam.fn.harness.data.BeamFnDataClient
>
> public interface
BeamFnDataClient {
> <T> InboundDataClient
>
receive(ApiServiceDescriptor
> apiServiceDescriptor,
> LogicalEndpoint
inputLocation,
>
Coder<WindowedValue<T>> coder,
>
FnDataReceiver<WindowedValue<T>>
> receiver);
> <T>
>
CloseableFnDataReceiver<WindowedValue<T>>
> send(BeamFnDataGrpcClient
>
Endpoints.ApiServiceDescriptor
> apiServiceDescriptor,
> LogicalEndpoint
outputLocation,
>
Coder<WindowedValue<T>> coder);
> }
>
>
> Both
`Coder<WindowedValue<T>>` and
>
`FnDataReceiver<WindowedValue<T>>`
> use `WindowedValue` as
the data
> structure that both sides
of Runner
> and SDK Harness know each
other.
> Control Plane/Data
Plane/State
> Plane/Logging is a highly
> abstraction, such as
Control Plane
> and Logging, these are common
> requirements for all
multi-language
> platforms. For example,
the Flink
> community is also
discussing how to
> support Python UDF, as
well as how
> to deal with docker
environment. how
> to data transfer, how to
state
> access, how to logging
etc. If Beam
> can further abstract
these service
> interfaces, i.e., interface
> definitions are
compatible with
> multiple engines, and finally
> provided to other
projects in the
> form of class libraries, it
> definitely will help
other platforms
> that want to support multiple
> languages. So could beam
can further
> abstract the interface
definition of
> FnDataService's
BeamFnDataClient?
> Here I am to throw out a
minnow to
> catch a whale, take the
> FnDataService#receive
interface as
> an example, and turn
> `WindowedValue<T>` into
`T` so that
> other platforms can be
extended
> arbitrarily, as follows:
>
> <T> InboundDataClient
> receive(LogicalEndpoint
> inputLocation, Coder<T>
coder,
> FnDataReceiver<T>> listener);
>
> What do you think?
>
> Feel free to correct me
if there any
> incorrect understanding.
And welcome
> any feedback!
>
>
> Regards,
> Jincheng
>