Thanks for the replies. @Ankur I tried putting a GroupByKey between the Window.into and the sink, and it didn't seem to make any difference... @Aljoscha I see, that makes sense - so the windowed write code (which uses TextIO.write().withWindowedWrites()) is not closing the files as soon as the window has ended?
I was trying this out with windowed writes, but what I really want to do doesn't involve windowed writes. I am actually trying to do this: pipeline .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) .apply(ParDo.of(new MapToKV()) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) .apply(Combine.perKey(new MyCombineFn())) .apply(ParDo.of(new SendWindowToAPI())); So here Combine.perKey will do a GroupByKey and then MyCombineFn will aggregate the values for each key. I then want to use another DoFn SendWindowToAPI which will ping the aggregate result for each window to a REST API. I am trying to hack it this way for now since there is no RestIO sink yet. I'm having the same problem doing this as when running my write windowed files example - the SendWindowToAPI DoFn seems to only ping the API after a few minutes / 30+ messages have been sent, rather than immediately after each window. Any ideas what's going on here? Thanks, Josh On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected]> wrote: > Hi, > First, a bit of clarification (or refinement): a windowing strategy is > used in all subsequent GroupByKey operations until another windowing > strategy is specified. That being said, from quickly glancing at the > windowed write-code I have the suspicion that triggers are not used for > windowed writing and that instead some other scheme is used for determining > when to close a file. > > I’m sure others with more knowledge of those parts will jump in later but > I nevertheless wanted to give you this quick answer. > > Best, > Aljoscha > > On 7. May 2017, at 00:57, Ankur Chauhan <[email protected]> wrote: > > That I believe is expected behavior. The windowing strategy is applied at > the following group by key operation. To have the windows fire the way you > want, try putting a group by key immediately after the desired windowing > function. > > The messages right now are being bundled aggressively for performance > reasons and doing a gbk would ensure desired bundle delineations. > > Ankur Chauhan > Sent from my iPhone > > On May 6, 2017, at 14:11, Josh <[email protected]> wrote: > > Hi all, > I am using a PubSubIO source, windowing every 10 seconds and then doing > something with the windows, for example: > > pipeline > .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) > .apply(MapElements > .into(TypeDescriptors.strings()) > .via((PubsubMessage msg) -> msg.getAttribute(*..*))) > .apply(new WriteOneFilePerWindow(..)); > > My expectation was that if I publish a pubsub message, and then publish > another 10+ seconds later, a single file should be written for the previous > 10 second window. However I find that I need to publish a lot of messages for > any files to be written at all (e.g. 30+ messages). > > Is this expected behaviour when using PubSubIO? Is there a way to tweak it to > fire the windows more eagerly? > > Thanks, > > Josh > > >
