Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-05-03 Thread Aljoscha Krettek
Maybe, I'll try and figure something out. :-)

My problem was that the doc for StateInternals explicitly states that
access to state is always implicitly scoped to the key being processed. In
my understanding this was always the key of an element but it seems that it
can also be a more abstract key, such as the sharding key. The fact that
this could be the case was hidden away in code outside the SDK, it seems.

Thanks for your help!

On Tue, 3 May 2016 at 19:40 Kenneth Knowles  wrote:

> I think the answer to your questions might be StateNamespace.
>
> The lowest level of state is always key-scoped, while the StateNamespace
> indicates whether it is global to the key, further scoped to a particular
> window, or even scoped to a particular trigger. When the DoFn needs a side
> input, the key might actually be gone from the user's point of view. It is
> up to the StepContext to provide an appropriately-scoped StateInternals,
> usually by some consistent sharding key such as the key from the upstream
> GBK.
>
> I don't want to go too much into state accessed in the DoFn as I haven't
> yet got a chance to prepare and publish the design doc for that, and I want
> everyone to have access to it for any discussion.
>
> Does this help?
>
> On Tue, May 3, 2016 at 1:58 AM, Aljoscha Krettek 
> wrote:
>
> > I'm afraid I have yet another question. What's the interplay between the
> > state that holds the buffered main-input elements and possible per-key
> > state that might be used by the DoFn. I guess I'm not seeing all the
> parts
> > but my problem is that one part (the buffering) requires a different type
> > of state scope as the other part (key-scoped state access in the DoFn)
> > while they both seem to be using the same StateInternals form the step
> > context. How does that work?
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 28 Apr 2016 at 20:05 Kenneth Knowles 
> > wrote:
> >
> > > On Thu, Apr 28, 2016 at 10:19 AM, Aljoscha Krettek <
> aljos...@apache.org>
> > > wrote:
> > >
> > > > No worries :-) and thanks for the detailed answers!
> > > >
> > > > I still have one question, though: you wrote that "The side input is
> > > > considered ready when there has been any data output/added to the
> > > > PCollection that it is being read as a side input. So the upstream
> > > trigger
> > > > controls this." How does this work with side inputs that consist of
> > > > multiple elements, i.e. ListPCollectionView and MapPCollectionView.
> For
> > > > them, do we also consider the side input as ready once the first
> > element
> > > > arrives? That's why I was wondering about the triggers being
> > responsible
> > > > for deciding when a side input is ready.
> > > >
> > >
> > > Yes, just as you describe. The side input window becomes ready once it
> > has
> > > any data. So, combining your items 2.5 and 3, you have a situation
> where
> > > main input elements may be combined with only a speculative subset of
> the
> > > side input data. They will not be reprocessed once more up-to-date side
> > > input values become known. Beyond this initial period of waiting for
> the
> > > very first firing of the side input window, there are no consistency
> > > restrictions/guarantees on main input vs side input windows or
> > triggerings.
> > > It may be that for a given runner updating the side input with the new
> > > value happens at high latency so all the main input elements are
> > processed
> > > and gone before the update goes through. It is a bit of a dangerous
> area
> > > for users. I'm pretty interested in ideas in this space.
> > >
> > > Kenn
> > >
> >
>


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-05-03 Thread Kenneth Knowles
I think the answer to your questions might be StateNamespace.

The lowest level of state is always key-scoped, while the StateNamespace
indicates whether it is global to the key, further scoped to a particular
window, or even scoped to a particular trigger. When the DoFn needs a side
input, the key might actually be gone from the user's point of view. It is
up to the StepContext to provide an appropriately-scoped StateInternals,
usually by some consistent sharding key such as the key from the upstream
GBK.

I don't want to go too much into state accessed in the DoFn as I haven't
yet got a chance to prepare and publish the design doc for that, and I want
everyone to have access to it for any discussion.

Does this help?

On Tue, May 3, 2016 at 1:58 AM, Aljoscha Krettek 
wrote:

> I'm afraid I have yet another question. What's the interplay between the
> state that holds the buffered main-input elements and possible per-key
> state that might be used by the DoFn. I guess I'm not seeing all the parts
> but my problem is that one part (the buffering) requires a different type
> of state scope as the other part (key-scoped state access in the DoFn)
> while they both seem to be using the same StateInternals form the step
> context. How does that work?
>
> Cheers,
> Aljoscha
>
> On Thu, 28 Apr 2016 at 20:05 Kenneth Knowles 
> wrote:
>
> > On Thu, Apr 28, 2016 at 10:19 AM, Aljoscha Krettek 
> > wrote:
> >
> > > No worries :-) and thanks for the detailed answers!
> > >
> > > I still have one question, though: you wrote that "The side input is
> > > considered ready when there has been any data output/added to the
> > > PCollection that it is being read as a side input. So the upstream
> > trigger
> > > controls this." How does this work with side inputs that consist of
> > > multiple elements, i.e. ListPCollectionView and MapPCollectionView. For
> > > them, do we also consider the side input as ready once the first
> element
> > > arrives? That's why I was wondering about the triggers being
> responsible
> > > for deciding when a side input is ready.
> > >
> >
> > Yes, just as you describe. The side input window becomes ready once it
> has
> > any data. So, combining your items 2.5 and 3, you have a situation where
> > main input elements may be combined with only a speculative subset of the
> > side input data. They will not be reprocessed once more up-to-date side
> > input values become known. Beyond this initial period of waiting for the
> > very first firing of the side input window, there are no consistency
> > restrictions/guarantees on main input vs side input windows or
> triggerings.
> > It may be that for a given runner updating the side input with the new
> > value happens at high latency so all the main input elements are
> processed
> > and gone before the update goes through. It is a bit of a dangerous area
> > for users. I'm pretty interested in ideas in this space.
> >
> > Kenn
> >
>


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-05-03 Thread Aljoscha Krettek
I'm afraid I have yet another question. What's the interplay between the
state that holds the buffered main-input elements and possible per-key
state that might be used by the DoFn. I guess I'm not seeing all the parts
but my problem is that one part (the buffering) requires a different type
of state scope as the other part (key-scoped state access in the DoFn)
while they both seem to be using the same StateInternals form the step
context. How does that work?

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 20:05 Kenneth Knowles  wrote:

> On Thu, Apr 28, 2016 at 10:19 AM, Aljoscha Krettek 
> wrote:
>
> > No worries :-) and thanks for the detailed answers!
> >
> > I still have one question, though: you wrote that "The side input is
> > considered ready when there has been any data output/added to the
> > PCollection that it is being read as a side input. So the upstream
> trigger
> > controls this." How does this work with side inputs that consist of
> > multiple elements, i.e. ListPCollectionView and MapPCollectionView. For
> > them, do we also consider the side input as ready once the first element
> > arrives? That's why I was wondering about the triggers being responsible
> > for deciding when a side input is ready.
> >
>
> Yes, just as you describe. The side input window becomes ready once it has
> any data. So, combining your items 2.5 and 3, you have a situation where
> main input elements may be combined with only a speculative subset of the
> side input data. They will not be reprocessed once more up-to-date side
> input values become known. Beyond this initial period of waiting for the
> very first firing of the side input window, there are no consistency
> restrictions/guarantees on main input vs side input windows or triggerings.
> It may be that for a given runner updating the side input with the new
> value happens at high latency so all the main input elements are processed
> and gone before the update goes through. It is a bit of a dangerous area
> for users. I'm pretty interested in ideas in this space.
>
> Kenn
>


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-28 Thread Kenneth Knowles
On Thu, Apr 28, 2016 at 10:19 AM, Aljoscha Krettek 
wrote:

> No worries :-) and thanks for the detailed answers!
>
> I still have one question, though: you wrote that "The side input is
> considered ready when there has been any data output/added to the
> PCollection that it is being read as a side input. So the upstream trigger
> controls this." How does this work with side inputs that consist of
> multiple elements, i.e. ListPCollectionView and MapPCollectionView. For
> them, do we also consider the side input as ready once the first element
> arrives? That's why I was wondering about the triggers being responsible
> for deciding when a side input is ready.
>

Yes, just as you describe. The side input window becomes ready once it has
any data. So, combining your items 2.5 and 3, you have a situation where
main input elements may be combined with only a speculative subset of the
side input data. They will not be reprocessed once more up-to-date side
input values become known. Beyond this initial period of waiting for the
very first firing of the side input window, there are no consistency
restrictions/guarantees on main input vs side input windows or triggerings.
It may be that for a given runner updating the side input with the new
value happens at high latency so all the main input elements are processed
and gone before the update goes through. It is a bit of a dangerous area
for users. I'm pretty interested in ideas in this space.

Kenn


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-28 Thread Aljoscha Krettek
No worries :-) and thanks for the detailed answers!

I still have one question, though: you wrote that "The side input is
considered ready when there has been any data output/added to the
PCollection that it is being read as a side input. So the upstream trigger
controls this." How does this work with side inputs that consist of
multiple elements, i.e. ListPCollectionView and MapPCollectionView. For
them, do we also consider the side input as ready once the first element
arrives? That's why I was wondering about the triggers being responsible
for deciding when a side input is ready.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 18:55 Kenneth Knowles  wrote:

> On Thu, Apr 28, 2016 at 1:26 AM, Aljoscha Krettek 
> wrote:
>
> > Bump.
> >
> > I'm afraid this might have gotten lost during the conferences/summits.
> >
> > On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek 
> wrote:
> >
> > > Ok, I'll try and start such a design. Before I can start, I have a few
> > > questions about how the side inputs actually work.  Some of it is in
> the
> > > docs, but some is also conjecture on my part about how
> MillWheel/Windmill
> > > (I don't know, is the first the system and the second a worker in
> there?)
> > > works.
> > >
> > > I'll write the questions as a set of assumptions and please correct me
> on
> > > those where I'm wrong:
> >
>
> Sorry for the delayed reply. I may have taken you too literally. Since
> nothing seemed wrong, I didn't say anything at all :-)
>
>
> >
> > > 1. Side inputs are always global/broadcast, they are never scoped to a
> > key.
> >
>
> True. A side input is a way of reading/viewing an entire PCollection.
>
>
> > 2. Mapping of main-input window to side-input window is done by the
> > > side-input WindowFn.
> >
>
> True.
>
>
> > If the side input has a larger window than the main
> > > input, processing of main-input elements will block until the side
> input
> > > becomes available. (Side inputs for a larger side-input window can
> become
> > > available early if using a speculative trigger)
> >
>
> True to this point: The main input element waits for the necessary side
> input window to be ready. It doesn't necessarily have to do with window
> size. It could just be a scheduling thing, or other runner-specific reason.
>
>
> >
> > > 2.5 If we update the side input because a speculative trigger fires
> again
> > > we don't reprocess the main-input elements that were already processed.
> > > Only new elements see the updated side input.
> >
>
> True. This is a place where the decision is due mostly to feasibility of
> implementation. It is easy to create a pipeline where this behavior is not
> ideal.
>
>
> > 3. The decision about whether side inputs are "available", i.e. complete
> > > in case of list/map side inputs is made by a Trigger. (This is also
> true
> > > for updates to side input caused by speculative triggers firing again.)
> > > This uses the continuation trigger feature, which is easy for time
> > triggers
> > > and interesting for the AfterPane.elementCountAtLeast() trigger which
> > > changes to AfterPane.elementCountAtLeast(1) on continuation and other
> > > speculative/late triggers.
> >
>
> True up to this point: The side input is considered ready when there has
> been any data output/added to the PCollection that it is being read as a
> side input. So the upstream trigger controls this. I don't know if the
> continuation trigger is important. The goal of the continuation trigger is
> to handle an implementation detail: Once an upstream trigger has already
> regulated the flow, we want downstream stuff to just proceed as fast as
> reasonable.
>
>
> > 4. About the StreamingSideInputDoFnRunner.java
> > > <
> >
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
> >
> > posted
> > > by Kenneth. It uses StateInternals to buffer the elements of the main
> > input
> > > in a BagState.  I was under the assumption that state is always scoped
> > to a
> > > key but side inputs can also be used on non-keyed streams. In this
> case,
> > > the state is scoped to the key group (the unit/granularity that is used
> > to
> > > rebalance in case of rescaling) and when we access the state we get all
> > > elements for the key groups for which our parallel worker instance is
> > > responsible. (This is the part where I am completely unsure about what
> is
> > > happening... :-O)
> >
>
> In this case, the key that is scoping the state is just the sharding key
> for the computation, just like you say. It may not be the actual key of a
> KV element, and the PCollection may not even be keyed, but the computation
> always has such a sharding.
>
> I believe replacing the runner-specific pieces of that code might be most
> of the way to what you want. And I think the runner-specific pieces are
> really just there because it didn't matter to make it more gener

Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-28 Thread Kenneth Knowles
On Thu, Apr 28, 2016 at 1:26 AM, Aljoscha Krettek 
wrote:

> Bump.
>
> I'm afraid this might have gotten lost during the conferences/summits.
>
> On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek  wrote:
>
> > Ok, I'll try and start such a design. Before I can start, I have a few
> > questions about how the side inputs actually work.  Some of it is in the
> > docs, but some is also conjecture on my part about how MillWheel/Windmill
> > (I don't know, is the first the system and the second a worker in there?)
> > works.
> >
> > I'll write the questions as a set of assumptions and please correct me on
> > those where I'm wrong:
>

Sorry for the delayed reply. I may have taken you too literally. Since
nothing seemed wrong, I didn't say anything at all :-)


>
> > 1. Side inputs are always global/broadcast, they are never scoped to a
> key.
>

True. A side input is a way of reading/viewing an entire PCollection.


> 2. Mapping of main-input window to side-input window is done by the
> > side-input WindowFn.
>

True.


> If the side input has a larger window than the main
> > input, processing of main-input elements will block until the side input
> > becomes available. (Side inputs for a larger side-input window can become
> > available early if using a speculative trigger)
>

True to this point: The main input element waits for the necessary side
input window to be ready. It doesn't necessarily have to do with window
size. It could just be a scheduling thing, or other runner-specific reason.


>
> > 2.5 If we update the side input because a speculative trigger fires again
> > we don't reprocess the main-input elements that were already processed.
> > Only new elements see the updated side input.
>

True. This is a place where the decision is due mostly to feasibility of
implementation. It is easy to create a pipeline where this behavior is not
ideal.


> 3. The decision about whether side inputs are "available", i.e. complete
> > in case of list/map side inputs is made by a Trigger. (This is also true
> > for updates to side input caused by speculative triggers firing again.)
> > This uses the continuation trigger feature, which is easy for time
> triggers
> > and interesting for the AfterPane.elementCountAtLeast() trigger which
> > changes to AfterPane.elementCountAtLeast(1) on continuation and other
> > speculative/late triggers.
>

True up to this point: The side input is considered ready when there has
been any data output/added to the PCollection that it is being read as a
side input. So the upstream trigger controls this. I don't know if the
continuation trigger is important. The goal of the continuation trigger is
to handle an implementation detail: Once an upstream trigger has already
regulated the flow, we want downstream stuff to just proceed as fast as
reasonable.


> 4. About the StreamingSideInputDoFnRunner.java
> > <
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java>
> posted
> > by Kenneth. It uses StateInternals to buffer the elements of the main
> input
> > in a BagState.  I was under the assumption that state is always scoped
> to a
> > key but side inputs can also be used on non-keyed streams. In this case,
> > the state is scoped to the key group (the unit/granularity that is used
> to
> > rebalance in case of rescaling) and when we access the state we get all
> > elements for the key groups for which our parallel worker instance is
> > responsible. (This is the part where I am completely unsure about what is
> > happening... :-O)
>

In this case, the key that is scoping the state is just the sharding key
for the computation, just like you say. It may not be the actual key of a
KV element, and the PCollection may not even be keyed, but the computation
always has such a sharding.

I believe replacing the runner-specific pieces of that code might be most
of the way to what you want. And I think the runner-specific pieces are
really just there because it didn't matter to make it more general, so it
will be easy to change. The upcoming in-process runner is developing some
abstractions in this area. But I can easily believe that there are other
ways of designing this. For example in batch mode we simply schedule the
side input first. Even in streaming style, the runner backend might be able
to take some more responsibility. So that code is just one point in the
design space.

Kenn


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-28 Thread Aljoscha Krettek
Bump.

I'm afraid this might have gotten lost during the conferences/summits.

On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek  wrote:

> Ok, I'll try and start such a design. Before I can start, I have a few
> questions about how the side inputs actually work.  Some of it is in the
> docs, but some is also conjecture on my part about how MillWheel/Windmill
> (I don't know, is the first the system and the second a worker in there?)
> works.
>
> I'll write the questions as a set of assumptions and please correct me on
> those where I'm wrong:
>
> 1. Side inputs are always global/broadcast, they are never scoped to a key.
>
> 2. Mapping of main-input window to side-input window is done by the
> side-input WindowFn. If the side input has a larger window than the main
> input, processing of main-input elements will block until the side input
> becomes available. (Side inputs for a larger side-input window can become
> available early if using a speculative trigger)
>
> 2.5 If we update the side input because a speculative trigger fires again
> we don't reprocess the main-input elements that were already processed.
> Only new elements see the updated side input.
>
> 3. The decision about whether side inputs are "available", i.e. complete
> in case of list/map side inputs is made by a Trigger. (This is also true
> for updates to side input caused by speculative triggers firing again.)
> This uses the continuation trigger feature, which is easy for time triggers
> and interesting for the AfterPane.elementCountAtLeast() trigger which
> changes to AfterPane.elementCountAtLeast(1) on continuation and other
> speculative/late triggers.
>
> 4. About the StreamingSideInputDoFnRunner.java
> 
>  posted
> by Kenneth. It uses StateInternals to buffer the elements of the main input
> in a BagState.  I was under the assumption that state is always scoped to a
> key but side inputs can also be used on non-keyed streams. In this case,
> the state is scoped to the key group (the unit/granularity that is used to
> rebalance in case of rescaling) and when we access the state we get all
> elements for the key groups for which our parallel worker instance is
> responsible. (This is the part where I am completely unsure about what is
> happening... :-O)
>
> These are the ones I can come up with for now. :-)
>
> On Wed, 20 Apr 2016 at 23:25 Davor Bonaci 
> wrote:
>
>> If we come up with a general approach in the context of the Flink runner,
>> perhaps that piece can go back to the "runner-core" component and be
>> adopted more widely.
>>
>> On Wed, Apr 20, 2016 at 8:13 AM, Kenneth Knowles 
>> wrote:
>>
>> > Hi Aljoscha,
>> >
>> > Great idea.
>> >
>> >  - The logic for matching up the windows is WindowFn#getSideInputWindow
>> [1]
>> >  - The SDK used to have something along the lines of what you describe
>> [2]
>> > but we thought it was too runner-specific, directly referencing Dataflow
>> > details, and with a particular model of buffering + timer. Perhaps it
>> is a
>> > starting place for your design?
>> >
>> > Kenn
>> >
>> > [1]
>> >
>> >
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131
>> >
>> > [2]
>> >
>> >
>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
>> >
>> > On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré 
>> > wrote:
>> >
>> > > Hi Aljoscha
>> > >
>> > > AFAIR, the Runner API Proposal document (from Kenneth) contains some
>> > > points about side input.
>> > >
>> > >
>> > >
>> >
>> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing
>> > >
>> > > I don't think it goes into the details of side inputs and windows, but
>> > > definitely the document we should extend.
>> > >
>> > > Regards
>> > > JB
>> > >
>> > >
>> > >
>> > > On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:
>> > >
>> > >> Hi,
>> > >> for https://issues.apache.org/jira/browse/BEAM-102 we will need to
>> have
>> > >> some functionality that deals with side inputs and windows (of both
>> the
>> > >> main input and the side inputs) and how they get matched and how we
>> wait
>> > >> for windows (blocking). I imagine that we could add some component
>> that
>> > is
>> > >> similar to ReduceFnRunner but for side inputs: We would just
>> instantiate
>> > >> it
>> > >> with a factory for state storage, then push elements into it while
>> > >> processing and it would provide a way to get a SideInputReader.
>> > >>
>> > >> I think this would not be specific to the Flink runner because other
>> > >> runner
>> > >> implementors will face similar problems. Are there any ideas/design
>> docs
>> > >> 

Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-21 Thread Aljoscha Krettek
Ok, I'll try and start such a design. Before I can start, I have a few
questions about how the side inputs actually work.  Some of it is in the
docs, but some is also conjecture on my part about how MillWheel/Windmill
(I don't know, is the first the system and the second a worker in there?)
works.

I'll write the questions as a set of assumptions and please correct me on
those where I'm wrong:

1. Side inputs are always global/broadcast, they are never scoped to a key.

2. Mapping of main-input window to side-input window is done by the
side-input WindowFn. If the side input has a larger window than the main
input, processing of main-input elements will block until the side input
becomes available. (Side inputs for a larger side-input window can become
available early if using a speculative trigger)

2.5 If we update the side input because a speculative trigger fires again
we don't reprocess the main-input elements that were already processed.
Only new elements see the updated side input.

3. The decision about whether side inputs are "available", i.e. complete in
case of list/map side inputs is made by a Trigger. (This is also true for
updates to side input caused by speculative triggers firing again.) This
uses the continuation trigger feature, which is easy for time triggers and
interesting for the AfterPane.elementCountAtLeast() trigger which changes
to AfterPane.elementCountAtLeast(1) on continuation and other
speculative/late triggers.

4. About the StreamingSideInputDoFnRunner.java

posted
by Kenneth. It uses StateInternals to buffer the elements of the main input
in a BagState.  I was under the assumption that state is always scoped to a
key but side inputs can also be used on non-keyed streams. In this case,
the state is scoped to the key group (the unit/granularity that is used to
rebalance in case of rescaling) and when we access the state we get all
elements for the key groups for which our parallel worker instance is
responsible. (This is the part where I am completely unsure about what is
happening... :-O)

These are the ones I can come up with for now. :-)

On Wed, 20 Apr 2016 at 23:25 Davor Bonaci  wrote:

> If we come up with a general approach in the context of the Flink runner,
> perhaps that piece can go back to the "runner-core" component and be
> adopted more widely.
>
> On Wed, Apr 20, 2016 at 8:13 AM, Kenneth Knowles 
> wrote:
>
> > Hi Aljoscha,
> >
> > Great idea.
> >
> >  - The logic for matching up the windows is WindowFn#getSideInputWindow
> [1]
> >  - The SDK used to have something along the lines of what you describe
> [2]
> > but we thought it was too runner-specific, directly referencing Dataflow
> > details, and with a particular model of buffering + timer. Perhaps it is
> a
> > starting place for your design?
> >
> > Kenn
> >
> > [1]
> >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131
> >
> > [2]
> >
> >
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
> >
> > On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré 
> > wrote:
> >
> > > Hi Aljoscha
> > >
> > > AFAIR, the Runner API Proposal document (from Kenneth) contains some
> > > points about side input.
> > >
> > >
> > >
> >
> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing
> > >
> > > I don't think it goes into the details of side inputs and windows, but
> > > definitely the document we should extend.
> > >
> > > Regards
> > > JB
> > >
> > >
> > >
> > > On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:
> > >
> > >> Hi,
> > >> for https://issues.apache.org/jira/browse/BEAM-102 we will need to
> have
> > >> some functionality that deals with side inputs and windows (of both
> the
> > >> main input and the side inputs) and how they get matched and how we
> wait
> > >> for windows (blocking). I imagine that we could add some component
> that
> > is
> > >> similar to ReduceFnRunner but for side inputs: We would just
> instantiate
> > >> it
> > >> with a factory for state storage, then push elements into it while
> > >> processing and it would provide a way to get a SideInputReader.
> > >>
> > >> I think this would not be specific to the Flink runner because other
> > >> runner
> > >> implementors will face similar problems. Are there any ideas/design
> docs
> > >> about such a thing already? If not, we should probably start
> designing.
> > >>
> > >> What do you think?
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >

Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-20 Thread Davor Bonaci
If we come up with a general approach in the context of the Flink runner,
perhaps that piece can go back to the "runner-core" component and be
adopted more widely.

On Wed, Apr 20, 2016 at 8:13 AM, Kenneth Knowles 
wrote:

> Hi Aljoscha,
>
> Great idea.
>
>  - The logic for matching up the windows is WindowFn#getSideInputWindow [1]
>  - The SDK used to have something along the lines of what you describe [2]
> but we thought it was too runner-specific, directly referencing Dataflow
> details, and with a particular model of buffering + timer. Perhaps it is a
> starting place for your design?
>
> Kenn
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131
>
> [2]
>
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
>
> On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré 
> wrote:
>
> > Hi Aljoscha
> >
> > AFAIR, the Runner API Proposal document (from Kenneth) contains some
> > points about side input.
> >
> >
> >
> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing
> >
> > I don't think it goes into the details of side inputs and windows, but
> > definitely the document we should extend.
> >
> > Regards
> > JB
> >
> >
> >
> > On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:
> >
> >> Hi,
> >> for https://issues.apache.org/jira/browse/BEAM-102 we will need to have
> >> some functionality that deals with side inputs and windows (of both the
> >> main input and the side inputs) and how they get matched and how we wait
> >> for windows (blocking). I imagine that we could add some component that
> is
> >> similar to ReduceFnRunner but for side inputs: We would just instantiate
> >> it
> >> with a factory for state storage, then push elements into it while
> >> processing and it would provide a way to get a SideInputReader.
> >>
> >> I think this would not be specific to the Flink runner because other
> >> runner
> >> implementors will face similar problems. Are there any ideas/design docs
> >> about such a thing already? If not, we should probably start designing.
> >>
> >> What do you think?
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-20 Thread Kenneth Knowles
Hi Aljoscha,

Great idea.

 - The logic for matching up the windows is WindowFn#getSideInputWindow [1]
 - The SDK used to have something along the lines of what you describe [2]
but we thought it was too runner-specific, directly referencing Dataflow
details, and with a particular model of buffering + timer. Perhaps it is a
starting place for your design?

Kenn

[1]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131

[2]
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java

On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré 
wrote:

> Hi Aljoscha
>
> AFAIR, the Runner API Proposal document (from Kenneth) contains some
> points about side input.
>
>
> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing
>
> I don't think it goes into the details of side inputs and windows, but
> definitely the document we should extend.
>
> Regards
> JB
>
>
>
> On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:
>
>> Hi,
>> for https://issues.apache.org/jira/browse/BEAM-102 we will need to have
>> some functionality that deals with side inputs and windows (of both the
>> main input and the side inputs) and how they get matched and how we wait
>> for windows (blocking). I imagine that we could add some component that is
>> similar to ReduceFnRunner but for side inputs: We would just instantiate
>> it
>> with a factory for state storage, then push elements into it while
>> processing and it would provide a way to get a SideInputReader.
>>
>> I think this would not be specific to the Flink runner because other
>> runner
>> implementors will face similar problems. Are there any ideas/design docs
>> about such a thing already? If not, we should probably start designing.
>>
>> What do you think?
>>
>> Cheers,
>> Aljoscha
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: [DISCUSS] Adding Some Sort of SideInputRunner

2016-04-20 Thread Jean-Baptiste Onofré

Hi Aljoscha

AFAIR, the Runner API Proposal document (from Kenneth) contains some 
points about side input.


https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing

I don't think it goes into the details of side inputs and windows, but 
definitely the document we should extend.


Regards
JB


On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:

Hi,
for https://issues.apache.org/jira/browse/BEAM-102 we will need to have
some functionality that deals with side inputs and windows (of both the
main input and the side inputs) and how they get matched and how we wait
for windows (blocking). I imagine that we could add some component that is
similar to ReduceFnRunner but for side inputs: We would just instantiate it
with a factory for state storage, then push elements into it while
processing and it would provide a way to get a SideInputReader.

I think this would not be specific to the Flink runner because other runner
implementors will face similar problems. Are there any ideas/design docs
about such a thing already? If not, we should probably start designing.

What do you think?

Cheers,
Aljoscha



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


[DISCUSS] Adding Some Sort of SideInputRunner

2016-04-20 Thread Aljoscha Krettek
Hi,
for https://issues.apache.org/jira/browse/BEAM-102 we will need to have
some functionality that deals with side inputs and windows (of both the
main input and the side inputs) and how they get matched and how we wait
for windows (blocking). I imagine that we could add some component that is
similar to ReduceFnRunner but for side inputs: We would just instantiate it
with a factory for state storage, then push elements into it while
processing and it would provide a way to get a SideInputReader.

I think this would not be specific to the Flink runner because other runner
implementors will face similar problems. Are there any ideas/design docs
about such a thing already? If not, we should probably start designing.

What do you think?

Cheers,
Aljoscha