Thanks for banging these out Lukasz. I’ll try and read them all this week.

We’re also planning to add support for the Fn API to the Flink Runner so that 
we can execute Python programs. I’m sure we’ll get some valuable feedback for 
you while doing that.

> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
> 
> I would like to share another document about the Fn API. This document
> specifically discusses how to access side inputs, access remote references
> (e.g. large iterables for hot keys produced by a GBK), and support user
> state.
> https://s.apache.org/beam-fn-state-api-and-bundle-processing
> 
> The document does require a strong foundation in the Apache Beam model and
> a good understanding of the prior shared docs:
> * How to process a bundle: https://s.apache.org/beam-fn-api
> -processing-a-bundle
> * How to send and receive data: https://s.apache.org/beam-fn-api
> -send-and-receive-data
> 
> I could really use the help of runner contributors to review the caching
> semantics within the SDK harness and whether they would work well for the
> runner they contribute to the most.
> 
> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
> 
>> Manu, the goal is to share here initially, update the docs addressing
>> people's comments, and then publish them on the website once they are
>> stable enough.
>> 
>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>> 
>>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>>> in your mail.
>>> 
>>> * How to process a bundle:
>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>> * How to send and receive data:
>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>> 
>>> By the way, is there a way to find them from the Beam website ?
>>> 
>>> 
>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>> wrote:
>>> 
>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>> my
>>>> time, I am focusing my attention on working on the Beam Portability
>>>> framework, specifically the Fn API so that we can get Python and other
>>>> language integrations work with any runner.
>>>> 
>>>> For new comers, I would like to reshare the overview:
>>>> https://s.apache.org/beam-fn-api
>>>> 
>>>> And for those of you who have been following this thread and
>>> contributors
>>>> focusing on Runner integration with Apache Beam:
>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>> pi-processing-a-
>>>> bundle
>>>> * How to send and receive data: https://s.apache.org/
>>>> beam-fn-api-send-and-receive-data
>>>> 
>>>> If you want to dive deeper, you should look at:
>>>> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>> er-api/src/main/proto/beam_runner_api.proto>
>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>> api/src/main/proto/beam_fn_api.proto>
>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>> java/harness
>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>> python/apache_beam/runners/worker
>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>> he_beam/runners/worker>
>>>> 
>>>> Next I'm planning on talking about Beam Fn State API and will need help
>>>> from Runner contributors to talk about caching semantics and key spaces
>>> and
>>>> whether the integrations mesh well with current Runner implementations.
>>> The
>>>> State API is meant to support user state, side inputs, and re-iteration
>>> for
>>>> large values produced by GroupByKey.
>>>> 
>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>> 
>>>>> Yes, I was using a Pipeline that was:
>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>> batch
>>>>> pipeline in the global window using the default trigger)
>>>>> 
>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>> representation
>>>>> to compare keys, so the above pipeline would normally be converted to
>>> the
>>>>> following two stages:
>>>>> Read -> GBK Writer
>>>>> GBK Reader -> IdentityParDo
>>>>> 
>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>> and
>>>>> decode the value.
>>>>> 
>>>>> When using the Fn API, those two stages expanded because of the Fn Api
>>>>> crossings using a gRPC Write/Read pair:
>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>> 
>>>>> In my naive prototype implementation, the coder was used to encode
>>>>> elements at the gRPC steps. This meant that the coder was
>>>>> encoding/decoding/encoding in the first stage and
>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>> amount
>>>> of
>>>>> times the coder was being invoked per element. This additional use of
>>> the
>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>> This
>>>>> implementation is quite inefficient and would benefit from merging the
>>>> gRPC
>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>> into
>>>>> another actor allowing for the creation of a fast path that can skip
>>>> parts
>>>>> of the decode/encode cycle through the coder. By using a byte array
>>> view
>>>>> over the logical stream, one can minimize the number of byte array
>>> copies
>>>>> which plagued my naive implementation. This can be done by only
>>> parsing
>>>> the
>>>>> element boundaries out of the stream to produce those logical byte
>>> array
>>>>> views. I have a very rough estimate that performing this optimization
>>>> would
>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>> 
>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>> marshalling/unmarshalling protos
>>>>> handling/managing the socket
>>>>> flow control
>>>>> ...
>>>>> 
>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>> 100KB,
>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>> phaser[2]),
>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>> easily
>>>>> dominated the differences in these other experiments.
>>>>> 
>>>>> Further analysis would need to be done to more accurately distill this
>>>>> down.
>>>>> 
>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/stream/
>>>>> BufferingStreamObserver.java
>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/stream/
>>>>> DirectStreamObserver.java
>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>> 
>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/channel/
>>>>> ManagedChannelFactory.java
>>>>> 
>>>>> 
>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ieme...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>> heard
>>>>>> about
>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>> amazing!
>>>>>> 
>>>>>> Just one question from your document, you said that 80% of the extra
>>>> (15%)
>>>>>> time
>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>> expand
>>>>>> in
>>>>>> your current ideas to improve this? (I am not sure I completely
>>>> understand
>>>>>> the
>>>>>> issue).
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>> <lc...@google.com.invalid>
>>>>>> wrote:
>>>>>> 
>>>>>>> Responded inline.
>>>>>>> 
>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <amitsel...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> This is truly amazing Luke!
>>>>>>>> 
>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>> delegate
>>>>>>> the
>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>> container
>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>> 
>>>>>>> 
>>>>>>> Yes, that is correct.
>>>>>>> 
>>>>>>> 
>>>>>>>> I wonder how the containers relate to the underlying engine's
>>> worker
>>>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>> "work"
>>>>>>> for
>>>>>>>> the worker's Java process (for example) now and it becomes a
>>> sort of
>>>>>>>> "dispatcher", would that change the resource allocation commonly
>>>> used
>>>>>> for
>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>> resources, while giving those to the container ?
>>>>>>>> 
>>>>>>> 
>>>>>>> I think with the four services (control, data, state, logging) you
>>> can
>>>>>> go
>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>> dedicate
>>>>>>> some machines to have specific tasks. Like you could have a few
>>>> machines
>>>>>>> dedicated to log aggregation which all the workers push their logs
>>> to.
>>>>>>> Similarly, you could have some machines that have a lot of memory
>>>> which
>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>> cluster
>>>>>>> of high memory machines could front the data service. I believe
>>> there
>>>>>> is a
>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>> specializes
>>>>>>> in and believe that with more effort comes more possibilities
>>> albeit
>>>>>> with
>>>>>>> increased internal complexity.
>>>>>>> 
>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>> containers
>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>> that
>>>>>>> the SDK container will get more resources but is dependent on the
>>>> runner
>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>> different
>>>>>>> then ParDo dominated pipelines).
>>>>>>> 
>>>>>>> 
>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>> as
>>>>>>> there's
>>>>>>>> no shuffle, you could keep executing in the same container ?
>>> meaning
>>>>>> that
>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>> 
>>>>>>> 
>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>> preserved
>>>>>>> so typical break points will be at shuffles and language crossing
>>>> points
>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>> graph
>>>>>>> even more for other reasons.
>>>>>>> 
>>>>>>> 
>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>> thanks
>>>>>> Luke!
>>>>>>>> 
>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>> <lc...@google.com.invalid
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>> 
>>>>>>>>> I would start by looking at the API/object model definitions
>>> found
>>>>>> in
>>>>>>>>> beam_fn_api.proto
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>> * FnHarness.java
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>> harness/FnHarness.java
>>>>>>>>>> 
>>>>>>>>> is the main entry point.
>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>> 
>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>> multiplex
>>>>>>>> a
>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>> multiple
>>>>>>>>> concurrent process bundle requests. It also contains the code
>>> to
>>>>>> take
>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>> a
>>>>>> gRPC
>>>>>>>>> stream.
>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>> 
>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>> runners-core
>>>>>>>> to
>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>> 
>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>> channel
>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>> would
>>>>>> avoid
>>>>>>>> the
>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>> org.apache.beam.fn.harness.fn and
>>> org.apache.beam.fn.harness.fake
>>>>>> as
>>>>>>>> they
>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>> 
>>>>>>>>> Code package descriptions:
>>>>>>>>> 
>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>> individual
>>>>>>>>> request handlers
>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>> logical
>>>>>>> stream
>>>>>>>>> multiplexing
>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>> DoFnRunner
>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>> implementation
>>>>>> and
>>>>>>>> JUL
>>>>>>>>> logging handler adapter
>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>> extensions
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>> <k...@google.com.invalid
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>> with
>>>>>> some
>>>>>>>>> links
>>>>>>>>>> into the most interesting bits?
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>> rober...@google.com.invalid> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>> example,
>>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>> runner
>>>>>>>>>>> could launch it as a subprocesses and communicate with it
>>> via
>>>>>> this
>>>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>> user
>>>>>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>> containers
>>>>>>>> (and
>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>> 
>>>>>>>>>>> However docker provides a nice cross-language way of
>>>> specifying
>>>>>> the
>>>>>>>>>>> environment including all dependencies (especially for
>>>> languages
>>>>>>> like
>>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>> self-contained
>>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>> and
>>>>>>>>>>> flexible (specifically it isolates the runtime environment
>>> and
>>>>>> one
>>>>>>>> can
>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>> 
>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>> performance
>>>>>> is an
>>>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>> mix
>>>>>> and
>>>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>> great
>>>>>>>> start.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>> <lc...@google.com.invalid
>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>> the
>>>>>>>>> execution
>>>>>>>>>>>> environment the user wants or the user re-uses an
>>> existing
>>>> one
>>>>>>>>>> (allowing
>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>> container
>>>>>>>>> that
>>>>>>>>>>> can
>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>> they
>>>>>> want
>>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>> within a
>>>>>>>>>> single
>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>> A runner would then be responsible for launching one or
>>> more
>>>>>> of
>>>>>>>> these
>>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>> up
>>>> or
>>>>>>> down
>>>>>>>>> the
>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>> A runner then interacts with the docker containers over
>>> the
>>>>>> gRPC
>>>>>>>>>> service
>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>> j...@nanthrax.net
>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>> clarify:
>>>>>> the
>>>>>>>>>> purpose
>>>>>>>>>>> of
>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>> then we
>>>>>> can
>>>>>>>>>>> "interact"
>>>>>>>>>>>>> with the container to spread and delegate processing to
>>> the
>>>>>>> docker
>>>>>>>>>>>>> container, correct ?
>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>> prerequisite.
>>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>> container
>>>>>>>> registry)
>>>>>>>>> is
>>>>>>>>>>> set
>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> JB
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>> Beam
>>>>>>>>> technical
>>>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>> using
>>>>>> an
>>>>>>>>>>> arbitrary
>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>> been
>>>>>>>> working
>>>>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>> service
>>>>>>>>>>> definitions
>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>> any
>>>>>>>> specific
>>>>>>>>>>> code
>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>> jbono...@apache.org
>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 

Reply via email to