Thanks for all these docs! They are exactly what was needed for new contributors as discussed in this thread

https://lists.apache.org/thread.html/ac93d29424e19d57097373b78f3f5bcbc701e4b51385a52a6e27b7ed@%3Cdev.beam.apache.org%3E

Etienne

Le 31/05/2017 à 11:12, Aljoscha Krettek a écrit :
Thanks for banging these out Lukasz. I’ll try and read them all this week.

We’re also planning to add support for the Fn API to the Flink Runner so that 
we can execute Python programs. I’m sure we’ll get some valuable feedback for 
you while doing that.

On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:

I would like to share another document about the Fn API. This document
specifically discusses how to access side inputs, access remote references
(e.g. large iterables for hot keys produced by a GBK), and support user
state.
https://s.apache.org/beam-fn-state-api-and-bundle-processing

The document does require a strong foundation in the Apache Beam model and
a good understanding of the prior shared docs:
* How to process a bundle: https://s.apache.org/beam-fn-api
-processing-a-bundle
* How to send and receive data: https://s.apache.org/beam-fn-api
-send-and-receive-data

I could really use the help of runner contributors to review the caching
semantics within the SDK harness and whether they would work well for the
runner they contribute to the most.

On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:

Manu, the goal is to share here initially, update the docs addressing
people's comments, and then publish them on the website once they are
stable enough.

On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <owenzhang1...@gmail.com>
wrote:

Thanks Lukasz. The following two links were somehow incorrectly formatted
in your mail.

* How to process a bundle:
https://s.apache.org/beam-fn-api-processing-a-bundle
* How to send and receive data:
https://s.apache.org/beam-fn-api-send-and-receive-data

By the way, is there a way to find them from the Beam website ?


On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
wrote:

Now that I'm back from vacation and the 2.0.0 release is not taking all
my
time, I am focusing my attention on working on the Beam Portability
framework, specifically the Fn API so that we can get Python and other
language integrations work with any runner.

For new comers, I would like to reshare the overview:
https://s.apache.org/beam-fn-api

And for those of you who have been following this thread and
contributors
focusing on Runner integration with Apache Beam:
* How to process a bundle: https://s.apache.org/beam-fn-a
pi-processing-a-
bundle
* How to send and receive data: https://s.apache.org/
beam-fn-api-send-and-receive-data

If you want to dive deeper, you should look at:
* Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
common/runner-api/src/main/proto/beam_runner_api.proto
<https://github.com/apache/beam/blob/master/sdks/common/runn
er-api/src/main/proto/beam_runner_api.proto>
* Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
common/fn-api/src/main/proto/beam_fn_api.proto
<https://github.com/apache/beam/blob/master/sdks/common/fn-
api/src/main/proto/beam_fn_api.proto>
* Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
java/harness
<https://github.com/apache/beam/tree/master/sdks/java/harness>
* Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
python/apache_beam/runners/worker
<https://github.com/apache/beam/tree/master/sdks/python/apac
he_beam/runners/worker>
Next I'm planning on talking about Beam Fn State API and will need help
from Runner contributors to talk about caching semantics and key spaces
and
whether the integrations mesh well with current Runner implementations.
The
State API is meant to support user state, side inputs, and re-iteration
for
large values produced by GroupByKey.

On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:

Yes, I was using a Pipeline that was:
Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
batch
pipeline in the global window using the default trigger)

In Google Cloud Dataflow, the shuffle step uses the binary
representation
to compare keys, so the above pipeline would normally be converted to
the
following two stages:
Read -> GBK Writer
GBK Reader -> IdentityParDo

Note that the GBK Writer and GBK Reader need to use a coder to encode
and
decode the value.

When using the Fn API, those two stages expanded because of the Fn Api
crossings using a gRPC Write/Read pair:
Read -> gRPC Write -> gRPC Read -> GBK Writer
GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo

In my naive prototype implementation, the coder was used to encode
elements at the gRPC steps. This meant that the coder was
encoding/decoding/encoding in the first stage and
decoding/encoding/decoding in the second stage. This tripled the
amount
of
times the coder was being invoked per element. This additional use of
the
coder accounted for ~12% (80% of the 15%) of the extra execution time.
This
implementation is quite inefficient and would benefit from merging the
gRPC
Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
into
another actor allowing for the creation of a fast path that can skip
parts
of the decode/encode cycle through the coder. By using a byte array
view
over the logical stream, one can minimize the number of byte array
copies
which plagued my naive implementation. This can be done by only
parsing
the
element boundaries out of the stream to produce those logical byte
array
views. I have a very rough estimate that performing this optimization
would
reduce the 12% overhead to somewhere between 4% and 6%.

The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
marshalling/unmarshalling protos
handling/managing the socket
flow control
...

Finally, I did try experiments with different buffer sizes (10KB,
100KB,
1000KB), flow control (separate thread[1] vs same thread with
phaser[2]),
and channel type [3] (NIO, epoll, domain socket), but coder overhead
easily
dominated the differences in these other experiments.

Further analysis would need to be done to more accurately distill this
down.

1: https://github.com/lukecwik/incubator-beam/blob/
fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
rness/stream/
BufferingStreamObserver.java
2: https://github.com/lukecwik/incubator-beam/blob/
fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
rness/stream/
DirectStreamObserver.java
3: https://github.com/lukecwik/incubator-beam/blob/

fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
rness/channel/
ManagedChannelFactory.java


On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ieme...@gmail.com>
wrote:
Awesome job Lukasz, Excellent, I have to confess the first time I
heard
about
the Fn API idea I was a bit incredulous, but you are making it real,
amazing!

Just one question from your document, you said that 80% of the extra
(15%)
time
goes into encoding and decoding the data for your test case, can you
expand
in
your current ideas to improve this? (I am not sure I completely
understand
the
issue).


On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
<lc...@google.com.invalid>
wrote:

Responded inline.

On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <amitsel...@gmail.com>
wrote:
This is truly amazing Luke!

If I understand this right, the runner executing the DoFn will
delegate
the
function code and input data (and state, coders, etc.) to the
container
where it will execute with the user's SDK of choice, right ?

Yes, that is correct.


I wonder how the containers relate to the underlying engine's
worker
processes ? is it a 1-1, container per worker ? if there's less
"work"
for
the worker's Java process (for example) now and it becomes a
sort of
"dispatcher", would that change the resource allocation commonly
used
for
the same Pipeline so that the worker processes would require less
resources, while giving those to the container ?

I think with the four services (control, data, state, logging) you
can
go
with a 1-1 relationship or break it up more finely grained and
dedicate
some machines to have specific tasks. Like you could have a few
machines
dedicated to log aggregation which all the workers push their logs
to.
Similarly, you could have some machines that have a lot of memory
which
would be better to be able to do shuffles in memory and then this
cluster
of high memory machines could front the data service. I believe
there
is a
lot of flexibility based upon what a runner can do and what it
specializes
in and believe that with more effort comes more possibilities
albeit
with
increased internal complexity.

The layout of resources depends on whether the services and SDK
containers
are co-hosted on the same machine or whether there is a different
architecture in play. In a co-hosted configuration, it seems likely
that
the SDK container will get more resources but is dependent on the
runner
and pipeline shape (shuffle heavy dominated pipelines will look
different
then ParDo dominated pipelines).


About executing sub-graphs, would it be true to say that as long
as
there's
no shuffle, you could keep executing in the same container ?
meaning
that
the graph is broken into sub-graphs by shuffles ?

The only thing that is required is that the Apache Beam model is
preserved
so typical break points will be at shuffles and language crossing
points
(e.g. Python ParDo -> Java ParDo). A runner is free to break up the
graph
even more for other reasons.


I have to dig-in deeper, so I could have more questions ;-)
thanks
Luke!
On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
<lc...@google.com.invalid
wrote:

I updated the PR description to contain the same.

I would start by looking at the API/object model definitions
found
in
beam_fn_api.proto
<
https://github.com/lukecwik/incubator-beam/blob/fn_api/
sdks/common/fn-api/src/main/proto/beam_fn_api.proto
Then depending on your interest, look at the following:
* FnHarness.java
<
https://github.com/lukecwik/incubator-beam/blob/fn_api/
sdks/java/harness/src/main/java/org/apache/beam/fn/
harness/FnHarness.java
is the main entry point.
* org.apache.beam.fn.harness.data
<
https://github.com/lukecwik/incubator-beam/tree/fn_api/
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
contains the most interesting bits of code since it is able to
multiplex
a
gRPC stream into multiple logical streams of elements bound for
multiple
concurrent process bundle requests. It also contains the code
to
take
multiple logical outbound streams and multiplex them back onto
a
gRPC
stream.
* org.apache.beam.runners.core
<
https://github.com/lukecwik/incubator-beam/tree/fn_api/
sdks/java/harness/src/main/java/org/apache/beam/runners/core
contains additional runners akin to the DoFnRunner found in
runners-core
to
support sources and gRPC endpoints.

Unless your really interested in how domain sockets, epoll, nio
channel
factories or how stream readiness callbacks work in gRPC, I
would
avoid
the
packages org.apache.beam.fn.harness.channel and
org.apache.beam.fn.harness.stream. Similarly I would avoid
org.apache.beam.fn.harness.fn and
org.apache.beam.fn.harness.fake
as
they
don't add anything meaningful to the api.

Code package descriptions:

org.apache.beam.fn.harness.FnHarness: main entry point
org.apache.beam.fn.harness.control: Control service client and
individual
request handlers
org.apache.beam.fn.harness.data: Data service client and
logical
stream
multiplexing
org.apache.beam.runners.core: Additional runners akin to the
DoFnRunner
found in runners-core to support sources and gRPC endpoints
org.apache.beam.fn.harness.logging: Logging client
implementation
and
JUL
logging handler adapter
org.apache.beam.fn.harness.channel: gRPC channel management
org.apache.beam.fn.harness.stream: gRPC stream management
org.apache.beam.fn.harness.fn: Java 8 functional interface
extensions

On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
<k...@google.com.invalid
wrote:

This is awesome! Any chance you could roadmap the PR for us
with
some
links
into the most interesting bits?

On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

Also, note that we can still support the "simple" case. For
example,
if the user supplies us with a jar file (as they do now) a
runner
could launch it as a subprocesses and communicate with it
via
this
same Fn API or install it in a fixed container itself--the
user
doesn't *need* to know about docker or manually manage
containers
(and
indeed the Fn API could be used in-process, cross-process,
cross-container, and even cross-machine).

However docker provides a nice cross-language way of
specifying
the
environment including all dependencies (especially for
languages
like
Python or C where the equivalent of a cross-platform,
self-contained
jar isn't as easy to produce) and is strictly more powerful
and
flexible (specifically it isolates the runtime environment
and
one
can
even use it for local testing).

Slicing a worker up like this without sacrificing
performance
is an
ambitious goal, but essential to the story of being able to
mix
and
match runners and SDKs arbitrarily, and I think this is a
great
start.

On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
<lc...@google.com.invalid
wrote:
Your correct, a docker container is created that contains
the
execution
environment the user wants or the user re-uses an
existing
one
(allowing
for a user to embed all their code/dependencies or use a
container
that
can
deploy code/dependencies on demand).
A user creates a pipeline saying which docker container
they
want
to
use
(this starts to allow for multiple container definitions
within a
single
pipeline to support multiple languages, versioning, ...).
A runner would then be responsible for launching one or
more
of
these
containers in a cluster manager of their choice (scaling
up
or
down
the
number of instances depending on demand/load/...).
A runner then interacts with the docker containers over
the
gRPC
service
definitions to delegate processing to.


On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
j...@nanthrax.net
wrote:

Hi Luke,

that's really great and very promising !

It's really ambitious but I like the idea. Just to
clarify:
the
purpose
of
using gRPC is once the docker container is running,
then we
can
"interact"
with the container to spread and delegate processing to
the
docker
container, correct ?
The users/devops have to setup the docker containers as
prerequisite.
Then, the "location" of the containers (kind of
container
registry)
is
set
via the pipeline options and used by gRPC ?

Thanks Luke !

Regards
JB


On 01/19/2017 03:56 PM, Lukasz Cwik wrote:

I have been prototyping several components towards the
Beam
technical
vision of being able to execute an arbitrary language
using
an
arbitrary
runner.

I would like to share this overview [1] of what I have
been
working
towards. I also share this PR [2] with a proposed API,
service
definitions
and partial implementation.

1: https://s.apache.org/beam-fn-api
2: https://github.com/apache/beam/pull/1801

Please comment on the overview within this thread, and
any
specific
code
comments on the PR directly.

Luke


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




Reply via email to