Re: New DoFn and WindowedValue/WinowingInternals
Hi Amit, Yes, this is correct. Part of the motivation for this is that DoFn API is user-facing, and the compressed representation of windowed elements (e.g. access to all windows of an element), as well as the ability to emit directly into a specified window, is an implementation detail of the runner that is dangerous to expose to SDK users (even I got burnt by it while working on SplittableParDo), so we would like to move WindowedValue into runners-core and keep the semantically clean API in the SDK: access to the current window, and assigning windows via Window.into(). On Sun, Dec 11, 2016 at 11:59 AM Kenneth Knowles wrote: > You've got it right. My recommendations is to just directly implement it > for the Spark runner. It will often actually clean things up a bit. Here's > the analogous change for the Flink runner: > https://github.com/apache/incubator-beam/pull/1435/files. > > With GABW, I tried going through the process of keeping some utility > expansion in runners-core, making StateInternalsFactory, refactoring > GroupAlsoByWindowsDoFn, then GroupByKeyViaGroupByKeyOnly, > GroupAlsoByWindow. But it ended up simpler for each runner to just not use > most of that and do it directly. (they all still share GABW but none of the > surrounding bits, IIRC) > > On Sun, Dec 11, 2016 at 10:33 AM, Amit Sela wrote: > > > So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and > > *GroupAlsoByWindow* requires a custom implementation by per runner as > they > > are not handled by DoFn anymore, right ? > > > > On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov > > wrote: > > > > > Hi Amit, I'll comment in more detail later, but meanwhile please take a > > > look at https://github.com/apache/incubator-beam/pull/1565 > > > There is a small amount of relevant changes to spark runner. > > > Take a look at implementation of SplittableParDo (already committed) in > > > particular ProcessFn and it's usage in direct runner - this is exactly > > what > > > you're looking for, a new DoFn that with per-runner support is able to > > emit > > > multi-windowed values. > > > On Sun, Dec 11, 2016 at 4:28 AM Amit Sela > wrote: > > > > > > > Hi all, > > > > > > > > I've been working on migrating the Spark runner to new DoFn and I've > > > > stumbled upon a couple of cases where OldDoFn is used in a way that > > > > accessed windowInternals (outputWindowedValue) such as > > AssignWindowsDoFn. > > > > > > > > Since changing windows is no longer the responsibility of DoFn I was > > > > wondering who and how is this done. > > > > > > > > Thanks, > > > > Amit > > > > > > > > > >
Re: New DoFn and WindowedValue/WinowingInternals
You've got it right. My recommendations is to just directly implement it for the Spark runner. It will often actually clean things up a bit. Here's the analogous change for the Flink runner: https://github.com/apache/incubator-beam/pull/1435/files. With GABW, I tried going through the process of keeping some utility expansion in runners-core, making StateInternalsFactory, refactoring GroupAlsoByWindowsDoFn, then GroupByKeyViaGroupByKeyOnly, GroupAlsoByWindow. But it ended up simpler for each runner to just not use most of that and do it directly. (they all still share GABW but none of the surrounding bits, IIRC) On Sun, Dec 11, 2016 at 10:33 AM, Amit Sela wrote: > So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and > *GroupAlsoByWindow* requires a custom implementation by per runner as they > are not handled by DoFn anymore, right ? > > On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov > wrote: > > > Hi Amit, I'll comment in more detail later, but meanwhile please take a > > look at https://github.com/apache/incubator-beam/pull/1565 > > There is a small amount of relevant changes to spark runner. > > Take a look at implementation of SplittableParDo (already committed) in > > particular ProcessFn and it's usage in direct runner - this is exactly > what > > you're looking for, a new DoFn that with per-runner support is able to > emit > > multi-windowed values. > > On Sun, Dec 11, 2016 at 4:28 AM Amit Sela wrote: > > > > > Hi all, > > > > > > I've been working on migrating the Spark runner to new DoFn and I've > > > stumbled upon a couple of cases where OldDoFn is used in a way that > > > accessed windowInternals (outputWindowedValue) such as > AssignWindowsDoFn. > > > > > > Since changing windows is no longer the responsibility of DoFn I was > > > wondering who and how is this done. > > > > > > Thanks, > > > Amit > > > > > >
Re: New DoFn and WindowedValue/WinowingInternals
So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and *GroupAlsoByWindow* requires a custom implementation by per runner as they are not handled by DoFn anymore, right ? On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov wrote: > Hi Amit, I'll comment in more detail later, but meanwhile please take a > look at https://github.com/apache/incubator-beam/pull/1565 > There is a small amount of relevant changes to spark runner. > Take a look at implementation of SplittableParDo (already committed) in > particular ProcessFn and it's usage in direct runner - this is exactly what > you're looking for, a new DoFn that with per-runner support is able to emit > multi-windowed values. > On Sun, Dec 11, 2016 at 4:28 AM Amit Sela wrote: > > > Hi all, > > > > I've been working on migrating the Spark runner to new DoFn and I've > > stumbled upon a couple of cases where OldDoFn is used in a way that > > accessed windowInternals (outputWindowedValue) such as AssignWindowsDoFn. > > > > Since changing windows is no longer the responsibility of DoFn I was > > wondering who and how is this done. > > > > Thanks, > > Amit > > >
Re: New DoFn and WindowedValue/WinowingInternals
Hi Amit, I'll comment in more detail later, but meanwhile please take a look at https://github.com/apache/incubator-beam/pull/1565 There is a small amount of relevant changes to spark runner. Take a look at implementation of SplittableParDo (already committed) in particular ProcessFn and it's usage in direct runner - this is exactly what you're looking for, a new DoFn that with per-runner support is able to emit multi-windowed values. On Sun, Dec 11, 2016 at 4:28 AM Amit Sela wrote: > Hi all, > > I've been working on migrating the Spark runner to new DoFn and I've > stumbled upon a couple of cases where OldDoFn is used in a way that > accessed windowInternals (outputWindowedValue) such as AssignWindowsDoFn. > > Since changing windows is no longer the responsibility of DoFn I was > wondering who and how is this done. > > Thanks, > Amit >