Hi Kenn, Thanks a lot for the help!
Shen On Wed, Feb 8, 2017 at 10:55 PM, Kenneth Knowles <[email protected]> wrote: > Hi Shen, > > Yes, this is how some existing runners do it. Here is one example: > https://github.com/apache/beam/blob/master/runners/apex/ > src/main/java/org/apache/beam/runners/apex/ApexRunner.java#L319 > > Kenn > > On Tue, Feb 7, 2017 at 3:33 PM, Shen Li <[email protected]> wrote: > > > Hi Kenn, > > > > Thanks for explaining. > > > > What if a View directly follows a Create transform: https://github.com/ > > apache/beam/blob/master/sdks/java/core/src/test/java/org/ > > apache/beam/sdk/transforms/ViewTest.java#L198 > > > > Does the runner need to implement the "Combine" behavior inside the View > > translator? > > > > Thanks, > > > > Shen > > > > On Tue, Feb 7, 2017 at 5:27 PM, Kenneth Knowles <[email protected]> > > wrote: > > > > > I have a couple of answers inline. Some others may have more to say, or > > > corrections for what I have said > > > > > > On Tue, Feb 7, 2017 at 12:34 PM, Shen Li <[email protected]> wrote: > > > > > > > Hi, > > > > > > > > I am trying to understand how does the SideInputHandler work. It > seems > > > that > > > > the SideInputHandler#addSideInputValue method overwrites the > > ValueStates > > > > of > > > > all windows associated with the input WindowedValue (i.e., discards > any > > > > existing side input states) : > > > > > > > > https://github.com/apache/beam/blob/master/runners/core- > > > > java/src/main/java/org/apache/beam/runners/core/ > > > SideInputHandler.java#L156 > > > > > > > > Does it mean that I need to first retrieve existing side input states > > > > (SideInputHandler#get), and then merge the state with the newly > > arrived > > > > one before calling SideInputHandler#addSideInputValue? > > > > > > > > > > I think in existing implementations a side input after a Combine / > Concat > > > is handled by the expansion of the View transform, so only single > values > > > arrive to the SideInputHandler and it is always the newer triggering of > > the > > > data so you can just overwrite. > > > > > > What if the newly arrived WindowedValue associates with multiple > windows? > > > > Should I first extract those windows and then invoke the > > > addSideInputValue > > > > method multiple times? > > > > > > > > > > I can definitely answer that WindowedValue with multiple windows is a > > > compressed representation for multiple WindowedValue each in a single > > > window. The behavior needs to be the same for the code to work across > > > runners, since they may compress and expand differently. > > > > > > Hope this helps, > > > > > > Kenn > > > > > >
