Hi Reuven,
I think you have provided an optional solution for other community which
wants to take advantage of Beam's existing achievements. Thank you very
much!
I think the Flink community can choose to copy from Beam's code or
choose to rely directly on the beam's class library. The Flink community
also initiated a discussion, more info can be found here
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>
The purpose of Turns `WindowedValue<T>` into `T` is to promote the
interface design of Beam more versatile, so that other open source
projects have the opportunity to take advantage of Beam's existing
achievements. Of course, just changing the `WindowedValue<T>` into `T`
is not enough to be shared by other projects in the form of a class
library, we need to do more efforts. If Beam can provide a class library
in the future, other community contributors will also have the
willingness to contribute to the beam community. This will benefit both
the community that wants to take advantage of Beam's existing
achievements and the Beam community itself. And thanks to Thomas for
that he has also made a lot of efforts in this regard.
Thanks again for your valuable suggestion, and welcome any feedback!
Best,
Jincheng
Reuven Lax <[email protected] <mailto:[email protected]>> 于2019年4月23日
周二 上午1:00写道:
One concern here: these interfaces are intended for use within the
Beam project. Beam may decide to make specific changes to them to
support needed functionality in Beam. If they are being reused by
other projects, then those changes risk breaking those other
projects in unexpected ways. I don't think we can guarantee that we
don't do that. If this is useful in Flink, it would be safer to copy
the code IMO rather than to directly depend on it.
On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
<[email protected] <mailto:[email protected]>> wrote:
Hi Kenn,
Thanks for your reply, and explained the design of WindowValue
clearly!
At present, the definitions of `FnDataService` and
`BeamFnDataClient` in Data Plane are very clear and universal,
such as: send(...)/receive(...). If it is only applied in the
project of Beam, it is already very good. Because `WindowValue`
is a very basic data structure in the Beam project, both the
Runner and the SDK harness have define the WindowedValue data
structure.
The reason I want to change the interface parameter from
`WindowedValue<T>` to T is because I want to make the `Data
Plane` interface into a class library that can be used by other
projects (such as Apache Flink), so that other projects Can have
its own `FnDataService` implementation. However, the definition
of `WindowedValue` does not apply to all projects. For example,
Apache Flink also has a definition similar to WindowedValue. For
example, Apache Flink Stream has StreamRecord. If we change
`WindowedValue<T>` to T, then other project's implementation
does not need to wrap WindowedValue, the interface will become
more concise. Furthermore, we only need one T, such as the
Apache Flink DataSet operator.
So, I agree with your understanding, I don't expect
`WindowedValueXXX<T>` in the FnDataService interface, I hope to
just use a `T`.
Have you seen some problem if we change the interface parameter
from `WindowedValue<T>` to T?
Thanks,
Jincheng
Kenneth Knowles <[email protected] <mailto:[email protected]>> 于
2019年4月20日周六 上午2:38写道:
WindowedValue has always been an interface, not a concrete
representation:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
It is an abstract class because we started in Java 7 where
you could not have default methods, and just due to legacy
style concerns. it is not just discussed, but implemented,
that there are WindowedValue implementations with fewer
allocations.
At the coder level, it was also always intended to have
multiple encodings. We already do have separate encodings
based on whether there is 1 window or multiple windows. The
coder for a particular kind of WindowedValue should decide
this. Before the Fn API none of this had to be standardized,
because the runner could just choose whatever it wants. Now
we have to standardize any encodings that runners and
harnesses both need to know. There should be many, and
adding more should be just a matter of standardization, no
new design.
None of this should be user-facing or in the runner API /
pipeline graph - that is critical to making it flexible on
the backend between the runner & SDK harness.
If I understand it, from our offline discussion, you are
interested in the case where you issue a
ProcessBundleRequest to the SDK harness and none of the
primitives in the subgraph will ever observe the metadata.
So you want to not even have a tiny
"WindowedValueWithNoMetadata". Is that accurate?
Kenn
On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
<[email protected] <mailto:[email protected]>>
wrote:
Thank you! And have a nice weekend!
Lukasz Cwik <[email protected] <mailto:[email protected]>>
于2019年4月20日周六 上午1:14写道:
I have added you as a contributor.
On Fri, Apr 19, 2019 at 9:56 AM jincheng sun
<[email protected]
<mailto:[email protected]>> wrote:
Hi Lukasz,
Thanks for your affirmation and provide more
contextual information. :)
Would you please give me the contributor
permission? My JIRA ID is sunjincheng121.
I would like to create/assign tickets for this work.
Thanks,
Jincheng
Lukasz Cwik <[email protected]
<mailto:[email protected]>> 于2019年4月20日周六
上午12:26写道:
Since I don't think this is a contentious
change.
On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik
<[email protected] <mailto:[email protected]>>
wrote:
Yes, using T makes sense.
The WindowedValue was meant to be a
context object in the SDK harness that
propagates various information about the
current element. We have discussed in
the past about:
* making optimizations which would pass
around less of the context information
if we know that the DoFns don't need it
(for example, all the values share the
same window).
* versioning the encoding separately
from the WindowedValue context object
(see recent discussion about element
timestamp precision [1])
* the runner may want its own
representation of a context object that
makes sense for it which isn't a
WindowedValue necessarily.
Feel free to cut a JIRA about this and
start working on a change towards this.
1:
https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
On Fri, Apr 19, 2019 at 3:18 AM jincheng
sun <[email protected]
<mailto:[email protected]>> wrote:
Hi Beam devs,
I read some of the docs about
`Communicating over the Fn API` in
Beam. I feel that Beam has a very
good design for Control Plane/Data
Plane/State Plane/Logging services,
and it is described in <How to send
and receive data> document. When
communicating between Runner and SDK
Harness, the DataPlane API will be
WindowedValue(An immutable triple of
value, timestamp, and windows.) As a
contract object between Runner and
SDK Harness. I see the interface
definitions for sending and
receiving data in the code as follows:
-
org.apache.beam.runners.fnexecution.data.FnDataService
public interface FnDataService {
<T> InboundDataClient
receive(LogicalEndpoint
inputLocation,
Coder<WindowedValue<T>> coder,
FnDataReceiver<WindowedValue<T>>
listener);
<T>
CloseableFnDataReceiver<WindowedValue<T>>
send(
LogicalEndpoint
outputLocation,
Coder<WindowedValue<T>> coder);
}
-
org.apache.beam.fn.harness.data.BeamFnDataClient
public interface BeamFnDataClient {
<T> InboundDataClient
receive(ApiServiceDescriptor
apiServiceDescriptor,
LogicalEndpoint inputLocation,
Coder<WindowedValue<T>> coder,
FnDataReceiver<WindowedValue<T>>
receiver);
<T>
CloseableFnDataReceiver<WindowedValue<T>>
send(BeamFnDataGrpcClient
Endpoints.ApiServiceDescriptor
apiServiceDescriptor,
LogicalEndpoint outputLocation,
Coder<WindowedValue<T>> coder);
}
Both `Coder<WindowedValue<T>>` and
`FnDataReceiver<WindowedValue<T>>`
use `WindowedValue` as the data
structure that both sides of Runner
and SDK Harness know each other.
Control Plane/Data Plane/State
Plane/Logging is a highly
abstraction, such as Control Plane
and Logging, these are common
requirements for all multi-language
platforms. For example, the Flink
community is also discussing how to
support Python UDF, as well as how
to deal with docker environment. how
to data transfer, how to state
access, how to logging etc. If Beam
can further abstract these service
interfaces, i.e., interface
definitions are compatible with
multiple engines, and finally
provided to other projects in the
form of class libraries, it
definitely will help other platforms
that want to support multiple
languages. So could beam can further
abstract the interface definition of
FnDataService's BeamFnDataClient?
Here I am to throw out a minnow to
catch a whale, take the
FnDataService#receive interface as
an example, and turn
`WindowedValue<T>` into `T` so that
other platforms can be extended
arbitrarily, as follows:
<T> InboundDataClient
receive(LogicalEndpoint
inputLocation, Coder<T> coder,
FnDataReceiver<T>> listener);
What do you think?
Feel free to correct me if there any
incorrect understanding. And welcome
any feedback!
Regards,
Jincheng