Hi Thomas, Yes that was the problem and your solution solves it. Thanks for explaining! I think I was confused by the Pubsub source sometimes logging a "last reported watermark", even though apparently the watermark hadn't been updated and propagated to the window function.
Thanks, Josh On Wed, May 10, 2017 at 11:39 PM, Thomas Groh <[email protected]> wrote: > Just to make sure I understand the problem, your issue is not that files > are not being written once more data is available, but that it takes a long > time from data initially showing up to them being written to files, yes? > > The line you linked is responsible for not eagerly updating watermarks - > the PubsubUnboundedSource estimates a watermark based off of the timestamp > of recently received messages; however, we don't want to see a single > element for a low-volume stream and immediately believe that we have all of > the data up to that point, so we wait until we have seen at least ten > elements within a single split before estimating the watermark. > > To more aggressively produce output, you can use triggers to fire windows > more eagerly. Where you apply your Window PTransform, you can add > `.triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings( > AfterProcessingTime.pastFirstElementInPane() > .plusDelayOf(Duration. > standardSeconds(10))))` > > (hopefully that renders properly) > > This will cause a pane to be produced before the window closes if elements > have been buffered for more than ten seconds. However, do note that you may > get multiple panes within a single window if more data arrives after the > ten second delay of the first element. `WriteOneFilePerWindow` must be > capable of handling this case. > > On Tue, May 9, 2017 at 2:59 AM, Josh <[email protected]> wrote: > >> I'm just testing this locally at the moment, using the direct runner. >> >> No worries if you can't help! I've been looking at the code and feel like >> maybe this is related to the PubSub source logic - for example here >> https://github.com/apache/beam/blob/master/sdks/java/ >> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/ >> io/gcp/pubsub/PubsubUnboundedSource.java#L178 there is some hard coded >> configuration which says there must be minimum 10 messages before the >> watermark is updated. I will keep looking into it - hopefully someone who >> understands the PubSub source can give some insight... >> >> Best, >> Josh >> >> On Tue, May 9, 2017 at 10:30 AM, Aljoscha Krettek <[email protected]> >> wrote: >> >>> Hi Josh, >>> What is this running on? I suspect the Dataflow service? In that case >>> I’m afraid I can’t help because I know to little about it. >>> >>> Best, >>> Aljoscha >>> >>> On 8. May 2017, at 14:25, Josh <[email protected]> wrote: >>> >>> Hi Aljoscha, what's the best way to investigate that? >>> >>> I can see the logs from PubSubIO (example pasted below). They show a >>> "last reported watermark" however it is not consistent (it is often logged >>> as a min timestamp until quite a few messages have been consumed) - I >>> assume this is because there are several threads consuming from PubSub and >>> some threads have not seen any messages yet? >>> >>> In my logs below you can see that one of the threads reported a >>> watermark: 2017-05-08T11:45:53.798Z last reported watermark - however >>> even though these watermarks are being reported, my SendWindowToAPI >>> DoFn is not processing any windows until later. >>> >>> >>> >>> 05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource >>> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received >>> messages, 0 current unread messages, 0 current unread bytes, 0 current >>> in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max >>> in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent >>> extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent >>> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0 >>> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark >>> >>> 05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource >>> - Pubsub projects/myproject/subscriptions/mysubscription has 3 received >>> messages, 0 current unread messages, 0 current unread bytes, 1 current >>> in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2 >>> max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent >>> extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent >>> expired, 14093ms recent message timestamp skew, 9224866280808573ms recent >>> watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last >>> reported watermark >>> >>> 05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource >>> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received >>> messages, 0 current unread messages, 0 current unread bytes, 1 current >>> in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1 >>> max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent >>> extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent >>> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0 >>> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark >>> >>> On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <[email protected]> >>> wrote: >>> >>>> One suspicion I have is that the watermark could be lacking behind a >>>> bit. Have you looked at that? >>>> >>>> On 7. May 2017, at 22:44, Josh <[email protected]> wrote: >>>> >>>> Thanks for the replies. >>>> @Ankur I tried putting a GroupByKey between the Window.into and the >>>> sink, and it didn't seem to make any difference... >>>> @Aljoscha I see, that makes sense - so the windowed write code (which >>>> uses TextIO.write().withWindowedWrites()) is not closing the files as >>>> soon as the window has ended? >>>> >>>> I was trying this out with windowed writes, but what I really want to >>>> do doesn't involve windowed writes. I am actually trying to do this: >>>> >>>> pipeline >>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubsc >>>> ription(..)) >>>> .apply(ParDo.of(new MapToKV()) >>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) >>>> .apply(Combine.perKey(new MyCombineFn())) >>>> .apply(ParDo.of(new SendWindowToAPI())); >>>> >>>> So here Combine.perKey will do a GroupByKey and then MyCombineFn will >>>> aggregate the values for each key. I then want to use another DoFn >>>> SendWindowToAPI which will ping the aggregate result for each window to a >>>> REST API. I am trying to hack it this way for now since there is no RestIO >>>> sink yet. >>>> >>>> I'm having the same problem doing this as when running my write >>>> windowed files example - the SendWindowToAPI DoFn seems to only ping the >>>> API after a few minutes / 30+ messages have been sent, rather than >>>> immediately after each window. >>>> >>>> Any ideas what's going on here? >>>> >>>> Thanks, >>>> Josh >>>> >>>> >>>> >>>> >>>> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected]> >>>> wrote: >>>> >>>>> Hi, >>>>> First, a bit of clarification (or refinement): a windowing strategy is >>>>> used in all subsequent GroupByKey operations until another windowing >>>>> strategy is specified. That being said, from quickly glancing at the >>>>> windowed write-code I have the suspicion that triggers are not used for >>>>> windowed writing and that instead some other scheme is used for >>>>> determining >>>>> when to close a file. >>>>> >>>>> I’m sure others with more knowledge of those parts will jump in later >>>>> but I nevertheless wanted to give you this quick answer. >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected]> wrote: >>>>> >>>>> That I believe is expected behavior. The windowing strategy is applied >>>>> at the following group by key operation. To have the windows fire the way >>>>> you want, try putting a group by key immediately after the desired >>>>> windowing function. >>>>> >>>>> The messages right now are being bundled aggressively for performance >>>>> reasons and doing a gbk would ensure desired bundle delineations. >>>>> >>>>> Ankur Chauhan >>>>> Sent from my iPhone >>>>> >>>>> On May 6, 2017, at 14:11, Josh <[email protected]> wrote: >>>>> >>>>> Hi all, >>>>> I am using a PubSubIO source, windowing every 10 seconds and then >>>>> doing something with the windows, for example: >>>>> >>>>> pipeline >>>>> >>>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) >>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) >>>>> .apply(MapElements >>>>> .into(TypeDescriptors.strings()) >>>>> .via((PubsubMessage msg) -> msg.getAttribute(*..*))) >>>>> .apply(new WriteOneFilePerWindow(..)); >>>>> >>>>> My expectation was that if I publish a pubsub message, and then publish >>>>> another 10+ seconds later, a single file should be written for the >>>>> previous 10 second window. However I find that I need to publish a lot of >>>>> messages for any files to be written at all (e.g. 30+ messages). >>>>> >>>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak >>>>> it to fire the windows more eagerly? >>>>> >>>>> Thanks, >>>>> >>>>> Josh >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >
