Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Thank you, Kenn!

Shen

On Thu, Mar 8, 2018 at 9:58 PM, Kenneth Knowles  wrote:

>
>
> On Thu, Mar 8, 2018 at 6:50 PM Shen Li  wrote:
>
>> Hi Kenn,
>>
>> I just want to confirm that I understand it correctly.
>>
>> >  - You know that W is expired only when you can be sure that no main
>> input element could reference it.
>>
>> This is determined by the *main input* watermark, allowedLateness, and
>> maximumLookback, right?
>>
>> https://github.com/apache/beam/blob/master/sdks/java/
>> core/src/main/java/org/apache/beam/sdk/transforms/windowing/
>> WindowMappingFn.java#L68
>>
>
> Yes, I think you can use this formula: https://github.com/
> apache/beam/blob/master/sdks/java/core/src/main/java/org/
> apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L61
>
>
>
>> > when W expires on the side input you make it ready, you process the
>> elements with empty contents on the side input, then you GC the side input.
>>
>> Even if W is unready according to *side input* watermark, the
>> runner/engine should still make it ready when it violates maximumLookback
>> and *main input* watermark. Is that correct?
>>
>
> This is true if there are no buffered elements. Then you can be sure that
> no main input element will show up that accesses W.
>
> Kenn
>
>
>>
>> Thanks,
>> Shen
>>
>>
>>
>>
>>
>> On Thu, Mar 8, 2018 at 9:31 PM, Shen Li  wrote:
>>
>>> I see. Thank you Kenn and Lukasz.
>>>
>>> Best,
>>> Shen
>>>
>>>
>>> On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles  wrote:
>>>
 I think the description of when a side input is ready vs expired is the
 trouble here.

  - You know that W is expired only when you can be sure that no main
 input element could reference it.
  - You know that W is ready *even if it got no data* if the input that
 would end up in W would be dropped (aka when W expires according to the
 *side input* watermark)

 So for your scenario, you push back the elements, that holds W from
 being collected, when W expires on the side input you make it ready, you
 process the elements with empty contents on the side input, then you GC the
 side input.

 Kenn

 On Thu, Mar 8, 2018 at 4:32 PM Shen Li  wrote:

> Hi Lukasz,
>
> Let's explain this problem using a specific example.
>
> Say I have a main input element X, which accesses side input window W.
> When X arrives at a ParDo operator, W is not ready and not expired either.
> So, in this case, the ParDo should push back X and wait for W to become
> ready. Say, after two minutes, W is still unready but is expired due to
> advanced main input watermark. In this situation, how does Beam expect
> runners/engines to handle the pushed back value X? Discard X or throw an
> error?
>
> Thanks,
> Shen
>
> On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik  wrote:
>
>> I believe your missing over this point: "and also to not expire the
>> side input till the main input watermark advances beyond the garbage
>> collection hold of the side input."
>>
>> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks again.
>>>
>>> >  the runner is required to hold back the main input till the side
>>> input is ready
>>>
>>> Yes, I understand these requirements. But what if the side input
>>> expires before it becomes ready?
>>>
>>> Shen
>>>
>>>
>>
>
>>>
>>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Kenneth Knowles
On Thu, Mar 8, 2018 at 6:50 PM Shen Li  wrote:

> Hi Kenn,
>
> I just want to confirm that I understand it correctly.
>
> >  - You know that W is expired only when you can be sure that no main
> input element could reference it.
>
> This is determined by the *main input* watermark, allowedLateness, and
> maximumLookback, right?
>
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L68
>

Yes, I think you can use this formula:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L61



> > when W expires on the side input you make it ready, you process the
> elements with empty contents on the side input, then you GC the side input.
>
> Even if W is unready according to *side input* watermark, the
> runner/engine should still make it ready when it violates maximumLookback
> and *main input* watermark. Is that correct?
>

This is true if there are no buffered elements. Then you can be sure that
no main input element will show up that accesses W.

Kenn


>
> Thanks,
> Shen
>
>
>
>
>
> On Thu, Mar 8, 2018 at 9:31 PM, Shen Li  wrote:
>
>> I see. Thank you Kenn and Lukasz.
>>
>> Best,
>> Shen
>>
>>
>> On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles  wrote:
>>
>>> I think the description of when a side input is ready vs expired is the
>>> trouble here.
>>>
>>>  - You know that W is expired only when you can be sure that no main
>>> input element could reference it.
>>>  - You know that W is ready *even if it got no data* if the input that
>>> would end up in W would be dropped (aka when W expires according to the
>>> *side input* watermark)
>>>
>>> So for your scenario, you push back the elements, that holds W from
>>> being collected, when W expires on the side input you make it ready, you
>>> process the elements with empty contents on the side input, then you GC the
>>> side input.
>>>
>>> Kenn
>>>
>>> On Thu, Mar 8, 2018 at 4:32 PM Shen Li  wrote:
>>>
 Hi Lukasz,

 Let's explain this problem using a specific example.

 Say I have a main input element X, which accesses side input window W.
 When X arrives at a ParDo operator, W is not ready and not expired either.
 So, in this case, the ParDo should push back X and wait for W to become
 ready. Say, after two minutes, W is still unready but is expired due to
 advanced main input watermark. In this situation, how does Beam expect
 runners/engines to handle the pushed back value X? Discard X or throw an
 error?

 Thanks,
 Shen

 On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik  wrote:

> I believe your missing over this point: "and also to not expire the
> side input till the main input watermark advances beyond the garbage
> collection hold of the side input."
>
> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:
>
>> Hi Lukasz,
>>
>> Thanks again.
>>
>> >  the runner is required to hold back the main input till the side
>> input is ready
>>
>> Yes, I understand these requirements. But what if the side input
>> expires before it becomes ready?
>>
>> Shen
>>
>>
>

>>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Hi Kenn,

I just want to confirm that I understand it correctly.

>  - You know that W is expired only when you can be sure that no main
input element could reference it.

This is determined by the *main input* watermark, allowedLateness, and
maximumLookback, right?

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L68

> when W expires on the side input you make it ready, you process the
elements with empty contents on the side input, then you GC the side input.

Even if W is unready according to *side input* watermark, the runner/engine
should still make it ready when it violates maximumLookback and *main
input* watermark. Is that correct?

Thanks,
Shen





On Thu, Mar 8, 2018 at 9:31 PM, Shen Li  wrote:

> I see. Thank you Kenn and Lukasz.
>
> Best,
> Shen
>
>
> On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles  wrote:
>
>> I think the description of when a side input is ready vs expired is the
>> trouble here.
>>
>>  - You know that W is expired only when you can be sure that no main
>> input element could reference it.
>>  - You know that W is ready *even if it got no data* if the input that
>> would end up in W would be dropped (aka when W expires according to the
>> *side input* watermark)
>>
>> So for your scenario, you push back the elements, that holds W from being
>> collected, when W expires on the side input you make it ready, you process
>> the elements with empty contents on the side input, then you GC the side
>> input.
>>
>> Kenn
>>
>> On Thu, Mar 8, 2018 at 4:32 PM Shen Li  wrote:
>>
>>> Hi Lukasz,
>>>
>>> Let's explain this problem using a specific example.
>>>
>>> Say I have a main input element X, which accesses side input window W.
>>> When X arrives at a ParDo operator, W is not ready and not expired either.
>>> So, in this case, the ParDo should push back X and wait for W to become
>>> ready. Say, after two minutes, W is still unready but is expired due to
>>> advanced main input watermark. In this situation, how does Beam expect
>>> runners/engines to handle the pushed back value X? Discard X or throw an
>>> error?
>>>
>>> Thanks,
>>> Shen
>>>
>>> On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik  wrote:
>>>
 I believe your missing over this point: "and also to not expire the
 side input till the main input watermark advances beyond the garbage
 collection hold of the side input."

 On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:

> Hi Lukasz,
>
> Thanks again.
>
> >  the runner is required to hold back the main input till the side
> input is ready
>
> Yes, I understand these requirements. But what if the side input
> expires before it becomes ready?
>
> Shen
>
>

>>>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
I see. Thank you Kenn and Lukasz.

Best,
Shen


On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles  wrote:

> I think the description of when a side input is ready vs expired is the
> trouble here.
>
>  - You know that W is expired only when you can be sure that no main input
> element could reference it.
>  - You know that W is ready *even if it got no data* if the input that
> would end up in W would be dropped (aka when W expires according to the
> *side input* watermark)
>
> So for your scenario, you push back the elements, that holds W from being
> collected, when W expires on the side input you make it ready, you process
> the elements with empty contents on the side input, then you GC the side
> input.
>
> Kenn
>
> On Thu, Mar 8, 2018 at 4:32 PM Shen Li  wrote:
>
>> Hi Lukasz,
>>
>> Let's explain this problem using a specific example.
>>
>> Say I have a main input element X, which accesses side input window W.
>> When X arrives at a ParDo operator, W is not ready and not expired either.
>> So, in this case, the ParDo should push back X and wait for W to become
>> ready. Say, after two minutes, W is still unready but is expired due to
>> advanced main input watermark. In this situation, how does Beam expect
>> runners/engines to handle the pushed back value X? Discard X or throw an
>> error?
>>
>> Thanks,
>> Shen
>>
>> On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik  wrote:
>>
>>> I believe your missing over this point: "and also to not expire the
>>> side input till the main input watermark advances beyond the garbage
>>> collection hold of the side input."
>>>
>>> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:
>>>
 Hi Lukasz,

 Thanks again.

 >  the runner is required to hold back the main input till the side
 input is ready

 Yes, I understand these requirements. But what if the side input
 expires before it becomes ready?

 Shen


>>>
>>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Kenneth Knowles
I think the description of when a side input is ready vs expired is the
trouble here.

 - You know that W is expired only when you can be sure that no main input
element could reference it.
 - You know that W is ready *even if it got no data* if the input that
would end up in W would be dropped (aka when W expires according to the
*side input* watermark)

So for your scenario, you push back the elements, that holds W from being
collected, when W expires on the side input you make it ready, you process
the elements with empty contents on the side input, then you GC the side
input.

Kenn

On Thu, Mar 8, 2018 at 4:32 PM Shen Li  wrote:

> Hi Lukasz,
>
> Let's explain this problem using a specific example.
>
> Say I have a main input element X, which accesses side input window W.
> When X arrives at a ParDo operator, W is not ready and not expired either.
> So, in this case, the ParDo should push back X and wait for W to become
> ready. Say, after two minutes, W is still unready but is expired due to
> advanced main input watermark. In this situation, how does Beam expect
> runners/engines to handle the pushed back value X? Discard X or throw an
> error?
>
> Thanks,
> Shen
>
> On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik  wrote:
>
>> I believe your missing over this point: "and also to not expire the side
>> input till the main input watermark advances beyond the garbage collection
>> hold of the side input."
>>
>> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks again.
>>>
>>> >  the runner is required to hold back the main input till the side
>>> input is ready
>>>
>>> Yes, I understand these requirements. But what if the side input expires
>>> before it becomes ready?
>>>
>>> Shen
>>>
>>>
>>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Hi Lukasz,

Let's explain this problem using a specific example.

Say I have a main input element X, which accesses side input window W. When
X arrives at a ParDo operator, W is not ready and not expired either. So,
in this case, the ParDo should push back X and wait for W to become ready.
Say, after two minutes, W is still unready but is expired due to advanced
main input watermark. In this situation, how does Beam expect
runners/engines to handle the pushed back value X? Discard X or throw an
error?

Thanks,
Shen

On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik  wrote:

> I believe your missing over this point: "and also to not expire the side
> input till the main input watermark advances beyond the garbage collection
> hold of the side input."
>
> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:
>
>> Hi Lukasz,
>>
>> Thanks again.
>>
>> >  the runner is required to hold back the main input till the side
>> input is ready
>>
>> Yes, I understand these requirements. But what if the side input expires
>> before it becomes ready?
>>
>> Shen
>>
>>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Lukasz Cwik
I believe your missing over this point: "and also to not expire the side
input till the main input watermark advances beyond the garbage collection
hold of the side input."

On Thu, Mar 8, 2018 at 3:33 PM, Shen Li  wrote:

> Hi Lukasz,
>
> Thanks again.
>
> >  the runner is required to hold back the main input till the side input
> is ready
>
> Yes, I understand these requirements. But what if the side input expires
> before it becomes ready?
>
> Shen
>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Hi Lukasz,

Thanks again.

>  the runner is required to hold back the main input till the side input
is ready

Yes, I understand these requirements. But what if the side input expires
before it becomes ready?

Shen


Re: Main input element accesses expired side input windows

2018-03-08 Thread Lukasz Cwik
Neither, the runner is required to hold back the main input till the side
input is ready and also to not expire the side input till the main input
watermark advances beyond the garbage collection hold of the side input.


On Thu, Mar 8, 2018 at 1:52 PM, Shen Li  wrote:

> Hi Lukasz,
>
> Thanks.
>
> >  So having the side input significantly delayed can cause a serious
> backlog on the main input.
>
> Sometimes, side input is delayed and it is out of the applications'
> control. In this situation, should the runner/engine discard pushed back
> main elements if they access expired side windows, or does Beam expect the
> runner/engine to terminate the execution?
>
>
> Shen
>
>
>
>
> On Thu, Mar 8, 2018 at 4:39 PM, Lukasz Cwik  wrote:
>
>> The runner/engine is responsible for pushing back the main input until
>> the side input becomes ready. So having the side input significantly
>> delayed can cause a serious backlog on the main input.
>>
>> On Thu, Mar 8, 2018 at 1:34 PM, Shen Li  wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks for the prompt response. Does it mean that if the side input
>>> elements and watermarks got delayed and lagged behind the main input
>>> stream, it is considered as an application problem, and Beam
>>> runners/engines do not need to handle that?
>>>
>>> Best,
>>> Shen
>>>
>>> On Thu, Mar 8, 2018 at 4:15 PM, Lukasz Cwik  wrote:
>>>
 The side input expires relative to the input watermark of the ParDo so
 what your suggesting could only happen if the runner had a bug and expired
 the side input before it should have happened or the user pipeline has a
 bug and is attempting to access a window for something that would always be
 considered beyond the maximum lookback which is also an error.

 On Thu, Mar 8, 2018 at 1:07 PM, Shen Li  wrote:

> Hi,
>
> When a main input element tries to access an expired side input window
> (violating maximumLookback), should ParDo discard the element or treat it
> as an error?
>
> Besides, what should ParDo do in the following situation:
> 1. The side input window W is not expired but unready when the main
> input element X arrives. So the ParDo pushes back the main input element.
> 2. Later the side input window W expires before X is processed.
> In this case, should ParDo throw away X?
>
> Thanks,
> Shen
>


>>>
>>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Lukasz Cwik
The runner/engine is responsible for pushing back the main input until the
side input becomes ready. So having the side input significantly delayed
can cause a serious backlog on the main input.

On Thu, Mar 8, 2018 at 1:34 PM, Shen Li  wrote:

> Hi Lukasz,
>
> Thanks for the prompt response. Does it mean that if the side input
> elements and watermarks got delayed and lagged behind the main input
> stream, it is considered as an application problem, and Beam
> runners/engines do not need to handle that?
>
> Best,
> Shen
>
> On Thu, Mar 8, 2018 at 4:15 PM, Lukasz Cwik  wrote:
>
>> The side input expires relative to the input watermark of the ParDo so
>> what your suggesting could only happen if the runner had a bug and expired
>> the side input before it should have happened or the user pipeline has a
>> bug and is attempting to access a window for something that would always be
>> considered beyond the maximum lookback which is also an error.
>>
>> On Thu, Mar 8, 2018 at 1:07 PM, Shen Li  wrote:
>>
>>> Hi,
>>>
>>> When a main input element tries to access an expired side input window
>>> (violating maximumLookback), should ParDo discard the element or treat it
>>> as an error?
>>>
>>> Besides, what should ParDo do in the following situation:
>>> 1. The side input window W is not expired but unready when the main
>>> input element X arrives. So the ParDo pushes back the main input element.
>>> 2. Later the side input window W expires before X is processed.
>>> In this case, should ParDo throw away X?
>>>
>>> Thanks,
>>> Shen
>>>
>>
>>
>


Re: Main input element accesses expired side input windows

2018-03-08 Thread Lukasz Cwik
The side input expires relative to the input watermark of the ParDo so what
your suggesting could only happen if the runner had a bug and expired the
side input before it should have happened or the user pipeline has a bug
and is attempting to access a window for something that would always be
considered beyond the maximum lookback which is also an error.

On Thu, Mar 8, 2018 at 1:07 PM, Shen Li  wrote:

> Hi,
>
> When a main input element tries to access an expired side input window
> (violating maximumLookback), should ParDo discard the element or treat it
> as an error?
>
> Besides, what should ParDo do in the following situation:
> 1. The side input window W is not expired but unready when the main input
> element X arrives. So the ParDo pushes back the main input element.
> 2. Later the side input window W expires before X is processed.
> In this case, should ParDo throw away X?
>
> Thanks,
> Shen
>