Re: Main input element accesses expired side input windows
Thank you, Kenn! Shen On Thu, Mar 8, 2018 at 9:58 PM, Kenneth Knowleswrote: > > > On Thu, Mar 8, 2018 at 6:50 PM Shen Li wrote: > >> Hi Kenn, >> >> I just want to confirm that I understand it correctly. >> >> > - You know that W is expired only when you can be sure that no main >> input element could reference it. >> >> This is determined by the *main input* watermark, allowedLateness, and >> maximumLookback, right? >> >> https://github.com/apache/beam/blob/master/sdks/java/ >> core/src/main/java/org/apache/beam/sdk/transforms/windowing/ >> WindowMappingFn.java#L68 >> > > Yes, I think you can use this formula: https://github.com/ > apache/beam/blob/master/sdks/java/core/src/main/java/org/ > apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L61 > > > >> > when W expires on the side input you make it ready, you process the >> elements with empty contents on the side input, then you GC the side input. >> >> Even if W is unready according to *side input* watermark, the >> runner/engine should still make it ready when it violates maximumLookback >> and *main input* watermark. Is that correct? >> > > This is true if there are no buffered elements. Then you can be sure that > no main input element will show up that accesses W. > > Kenn > > >> >> Thanks, >> Shen >> >> >> >> >> >> On Thu, Mar 8, 2018 at 9:31 PM, Shen Li wrote: >> >>> I see. Thank you Kenn and Lukasz. >>> >>> Best, >>> Shen >>> >>> >>> On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles wrote: >>> I think the description of when a side input is ready vs expired is the trouble here. - You know that W is expired only when you can be sure that no main input element could reference it. - You know that W is ready *even if it got no data* if the input that would end up in W would be dropped (aka when W expires according to the *side input* watermark) So for your scenario, you push back the elements, that holds W from being collected, when W expires on the side input you make it ready, you process the elements with empty contents on the side input, then you GC the side input. Kenn On Thu, Mar 8, 2018 at 4:32 PM Shen Li wrote: > Hi Lukasz, > > Let's explain this problem using a specific example. > > Say I have a main input element X, which accesses side input window W. > When X arrives at a ParDo operator, W is not ready and not expired either. > So, in this case, the ParDo should push back X and wait for W to become > ready. Say, after two minutes, W is still unready but is expired due to > advanced main input watermark. In this situation, how does Beam expect > runners/engines to handle the pushed back value X? Discard X or throw an > error? > > Thanks, > Shen > > On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik wrote: > >> I believe your missing over this point: "and also to not expire the >> side input till the main input watermark advances beyond the garbage >> collection hold of the side input." >> >> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li wrote: >> >>> Hi Lukasz, >>> >>> Thanks again. >>> >>> > the runner is required to hold back the main input till the side >>> input is ready >>> >>> Yes, I understand these requirements. But what if the side input >>> expires before it becomes ready? >>> >>> Shen >>> >>> >> > >>> >>
Re: Main input element accesses expired side input windows
On Thu, Mar 8, 2018 at 6:50 PM Shen Liwrote: > Hi Kenn, > > I just want to confirm that I understand it correctly. > > > - You know that W is expired only when you can be sure that no main > input element could reference it. > > This is determined by the *main input* watermark, allowedLateness, and > maximumLookback, right? > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L68 > Yes, I think you can use this formula: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L61 > > when W expires on the side input you make it ready, you process the > elements with empty contents on the side input, then you GC the side input. > > Even if W is unready according to *side input* watermark, the > runner/engine should still make it ready when it violates maximumLookback > and *main input* watermark. Is that correct? > This is true if there are no buffered elements. Then you can be sure that no main input element will show up that accesses W. Kenn > > Thanks, > Shen > > > > > > On Thu, Mar 8, 2018 at 9:31 PM, Shen Li wrote: > >> I see. Thank you Kenn and Lukasz. >> >> Best, >> Shen >> >> >> On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles wrote: >> >>> I think the description of when a side input is ready vs expired is the >>> trouble here. >>> >>> - You know that W is expired only when you can be sure that no main >>> input element could reference it. >>> - You know that W is ready *even if it got no data* if the input that >>> would end up in W would be dropped (aka when W expires according to the >>> *side input* watermark) >>> >>> So for your scenario, you push back the elements, that holds W from >>> being collected, when W expires on the side input you make it ready, you >>> process the elements with empty contents on the side input, then you GC the >>> side input. >>> >>> Kenn >>> >>> On Thu, Mar 8, 2018 at 4:32 PM Shen Li wrote: >>> Hi Lukasz, Let's explain this problem using a specific example. Say I have a main input element X, which accesses side input window W. When X arrives at a ParDo operator, W is not ready and not expired either. So, in this case, the ParDo should push back X and wait for W to become ready. Say, after two minutes, W is still unready but is expired due to advanced main input watermark. In this situation, how does Beam expect runners/engines to handle the pushed back value X? Discard X or throw an error? Thanks, Shen On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik wrote: > I believe your missing over this point: "and also to not expire the > side input till the main input watermark advances beyond the garbage > collection hold of the side input." > > On Thu, Mar 8, 2018 at 3:33 PM, Shen Li wrote: > >> Hi Lukasz, >> >> Thanks again. >> >> > the runner is required to hold back the main input till the side >> input is ready >> >> Yes, I understand these requirements. But what if the side input >> expires before it becomes ready? >> >> Shen >> >> > >> >
Re: Main input element accesses expired side input windows
Hi Kenn, I just want to confirm that I understand it correctly. > - You know that W is expired only when you can be sure that no main input element could reference it. This is determined by the *main input* watermark, allowedLateness, and maximumLookback, right? https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L68 > when W expires on the side input you make it ready, you process the elements with empty contents on the side input, then you GC the side input. Even if W is unready according to *side input* watermark, the runner/engine should still make it ready when it violates maximumLookback and *main input* watermark. Is that correct? Thanks, Shen On Thu, Mar 8, 2018 at 9:31 PM, Shen Liwrote: > I see. Thank you Kenn and Lukasz. > > Best, > Shen > > > On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowles wrote: > >> I think the description of when a side input is ready vs expired is the >> trouble here. >> >> - You know that W is expired only when you can be sure that no main >> input element could reference it. >> - You know that W is ready *even if it got no data* if the input that >> would end up in W would be dropped (aka when W expires according to the >> *side input* watermark) >> >> So for your scenario, you push back the elements, that holds W from being >> collected, when W expires on the side input you make it ready, you process >> the elements with empty contents on the side input, then you GC the side >> input. >> >> Kenn >> >> On Thu, Mar 8, 2018 at 4:32 PM Shen Li wrote: >> >>> Hi Lukasz, >>> >>> Let's explain this problem using a specific example. >>> >>> Say I have a main input element X, which accesses side input window W. >>> When X arrives at a ParDo operator, W is not ready and not expired either. >>> So, in this case, the ParDo should push back X and wait for W to become >>> ready. Say, after two minutes, W is still unready but is expired due to >>> advanced main input watermark. In this situation, how does Beam expect >>> runners/engines to handle the pushed back value X? Discard X or throw an >>> error? >>> >>> Thanks, >>> Shen >>> >>> On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik wrote: >>> I believe your missing over this point: "and also to not expire the side input till the main input watermark advances beyond the garbage collection hold of the side input." On Thu, Mar 8, 2018 at 3:33 PM, Shen Li wrote: > Hi Lukasz, > > Thanks again. > > > the runner is required to hold back the main input till the side > input is ready > > Yes, I understand these requirements. But what if the side input > expires before it becomes ready? > > Shen > > >>> >
Re: Main input element accesses expired side input windows
I see. Thank you Kenn and Lukasz. Best, Shen On Thu, Mar 8, 2018 at 7:46 PM, Kenneth Knowleswrote: > I think the description of when a side input is ready vs expired is the > trouble here. > > - You know that W is expired only when you can be sure that no main input > element could reference it. > - You know that W is ready *even if it got no data* if the input that > would end up in W would be dropped (aka when W expires according to the > *side input* watermark) > > So for your scenario, you push back the elements, that holds W from being > collected, when W expires on the side input you make it ready, you process > the elements with empty contents on the side input, then you GC the side > input. > > Kenn > > On Thu, Mar 8, 2018 at 4:32 PM Shen Li wrote: > >> Hi Lukasz, >> >> Let's explain this problem using a specific example. >> >> Say I have a main input element X, which accesses side input window W. >> When X arrives at a ParDo operator, W is not ready and not expired either. >> So, in this case, the ParDo should push back X and wait for W to become >> ready. Say, after two minutes, W is still unready but is expired due to >> advanced main input watermark. In this situation, how does Beam expect >> runners/engines to handle the pushed back value X? Discard X or throw an >> error? >> >> Thanks, >> Shen >> >> On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik wrote: >> >>> I believe your missing over this point: "and also to not expire the >>> side input till the main input watermark advances beyond the garbage >>> collection hold of the side input." >>> >>> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li wrote: >>> Hi Lukasz, Thanks again. > the runner is required to hold back the main input till the side input is ready Yes, I understand these requirements. But what if the side input expires before it becomes ready? Shen >>> >>
Re: Main input element accesses expired side input windows
I think the description of when a side input is ready vs expired is the trouble here. - You know that W is expired only when you can be sure that no main input element could reference it. - You know that W is ready *even if it got no data* if the input that would end up in W would be dropped (aka when W expires according to the *side input* watermark) So for your scenario, you push back the elements, that holds W from being collected, when W expires on the side input you make it ready, you process the elements with empty contents on the side input, then you GC the side input. Kenn On Thu, Mar 8, 2018 at 4:32 PM Shen Liwrote: > Hi Lukasz, > > Let's explain this problem using a specific example. > > Say I have a main input element X, which accesses side input window W. > When X arrives at a ParDo operator, W is not ready and not expired either. > So, in this case, the ParDo should push back X and wait for W to become > ready. Say, after two minutes, W is still unready but is expired due to > advanced main input watermark. In this situation, how does Beam expect > runners/engines to handle the pushed back value X? Discard X or throw an > error? > > Thanks, > Shen > > On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwik wrote: > >> I believe your missing over this point: "and also to not expire the side >> input till the main input watermark advances beyond the garbage collection >> hold of the side input." >> >> On Thu, Mar 8, 2018 at 3:33 PM, Shen Li wrote: >> >>> Hi Lukasz, >>> >>> Thanks again. >>> >>> > the runner is required to hold back the main input till the side >>> input is ready >>> >>> Yes, I understand these requirements. But what if the side input expires >>> before it becomes ready? >>> >>> Shen >>> >>> >> >
Re: Main input element accesses expired side input windows
Hi Lukasz, Let's explain this problem using a specific example. Say I have a main input element X, which accesses side input window W. When X arrives at a ParDo operator, W is not ready and not expired either. So, in this case, the ParDo should push back X and wait for W to become ready. Say, after two minutes, W is still unready but is expired due to advanced main input watermark. In this situation, how does Beam expect runners/engines to handle the pushed back value X? Discard X or throw an error? Thanks, Shen On Thu, Mar 8, 2018 at 6:35 PM, Lukasz Cwikwrote: > I believe your missing over this point: "and also to not expire the side > input till the main input watermark advances beyond the garbage collection > hold of the side input." > > On Thu, Mar 8, 2018 at 3:33 PM, Shen Li wrote: > >> Hi Lukasz, >> >> Thanks again. >> >> > the runner is required to hold back the main input till the side >> input is ready >> >> Yes, I understand these requirements. But what if the side input expires >> before it becomes ready? >> >> Shen >> >> >
Re: Main input element accesses expired side input windows
I believe your missing over this point: "and also to not expire the side input till the main input watermark advances beyond the garbage collection hold of the side input." On Thu, Mar 8, 2018 at 3:33 PM, Shen Liwrote: > Hi Lukasz, > > Thanks again. > > > the runner is required to hold back the main input till the side input > is ready > > Yes, I understand these requirements. But what if the side input expires > before it becomes ready? > > Shen > >
Re: Main input element accesses expired side input windows
Hi Lukasz, Thanks again. > the runner is required to hold back the main input till the side input is ready Yes, I understand these requirements. But what if the side input expires before it becomes ready? Shen
Re: Main input element accesses expired side input windows
Neither, the runner is required to hold back the main input till the side input is ready and also to not expire the side input till the main input watermark advances beyond the garbage collection hold of the side input. On Thu, Mar 8, 2018 at 1:52 PM, Shen Liwrote: > Hi Lukasz, > > Thanks. > > > So having the side input significantly delayed can cause a serious > backlog on the main input. > > Sometimes, side input is delayed and it is out of the applications' > control. In this situation, should the runner/engine discard pushed back > main elements if they access expired side windows, or does Beam expect the > runner/engine to terminate the execution? > > > Shen > > > > > On Thu, Mar 8, 2018 at 4:39 PM, Lukasz Cwik wrote: > >> The runner/engine is responsible for pushing back the main input until >> the side input becomes ready. So having the side input significantly >> delayed can cause a serious backlog on the main input. >> >> On Thu, Mar 8, 2018 at 1:34 PM, Shen Li wrote: >> >>> Hi Lukasz, >>> >>> Thanks for the prompt response. Does it mean that if the side input >>> elements and watermarks got delayed and lagged behind the main input >>> stream, it is considered as an application problem, and Beam >>> runners/engines do not need to handle that? >>> >>> Best, >>> Shen >>> >>> On Thu, Mar 8, 2018 at 4:15 PM, Lukasz Cwik wrote: >>> The side input expires relative to the input watermark of the ParDo so what your suggesting could only happen if the runner had a bug and expired the side input before it should have happened or the user pipeline has a bug and is attempting to access a window for something that would always be considered beyond the maximum lookback which is also an error. On Thu, Mar 8, 2018 at 1:07 PM, Shen Li wrote: > Hi, > > When a main input element tries to access an expired side input window > (violating maximumLookback), should ParDo discard the element or treat it > as an error? > > Besides, what should ParDo do in the following situation: > 1. The side input window W is not expired but unready when the main > input element X arrives. So the ParDo pushes back the main input element. > 2. Later the side input window W expires before X is processed. > In this case, should ParDo throw away X? > > Thanks, > Shen > >>> >> >
Re: Main input element accesses expired side input windows
The runner/engine is responsible for pushing back the main input until the side input becomes ready. So having the side input significantly delayed can cause a serious backlog on the main input. On Thu, Mar 8, 2018 at 1:34 PM, Shen Liwrote: > Hi Lukasz, > > Thanks for the prompt response. Does it mean that if the side input > elements and watermarks got delayed and lagged behind the main input > stream, it is considered as an application problem, and Beam > runners/engines do not need to handle that? > > Best, > Shen > > On Thu, Mar 8, 2018 at 4:15 PM, Lukasz Cwik wrote: > >> The side input expires relative to the input watermark of the ParDo so >> what your suggesting could only happen if the runner had a bug and expired >> the side input before it should have happened or the user pipeline has a >> bug and is attempting to access a window for something that would always be >> considered beyond the maximum lookback which is also an error. >> >> On Thu, Mar 8, 2018 at 1:07 PM, Shen Li wrote: >> >>> Hi, >>> >>> When a main input element tries to access an expired side input window >>> (violating maximumLookback), should ParDo discard the element or treat it >>> as an error? >>> >>> Besides, what should ParDo do in the following situation: >>> 1. The side input window W is not expired but unready when the main >>> input element X arrives. So the ParDo pushes back the main input element. >>> 2. Later the side input window W expires before X is processed. >>> In this case, should ParDo throw away X? >>> >>> Thanks, >>> Shen >>> >> >> >
Re: Main input element accesses expired side input windows
The side input expires relative to the input watermark of the ParDo so what your suggesting could only happen if the runner had a bug and expired the side input before it should have happened or the user pipeline has a bug and is attempting to access a window for something that would always be considered beyond the maximum lookback which is also an error. On Thu, Mar 8, 2018 at 1:07 PM, Shen Liwrote: > Hi, > > When a main input element tries to access an expired side input window > (violating maximumLookback), should ParDo discard the element or treat it > as an error? > > Besides, what should ParDo do in the following situation: > 1. The side input window W is not expired but unready when the main input > element X arrives. So the ParDo pushes back the main input element. > 2. Later the side input window W expires before X is processed. > In this case, should ParDo throw away X? > > Thanks, > Shen >