Hi Josh, What is this running on? I suspect the Dataflow service? In that case I’m afraid I can’t help because I know to little about it.
Best, Aljoscha > On 8. May 2017, at 14:25, Josh <[email protected]> wrote: > > Hi Aljoscha, what's the best way to investigate that? > > I can see the logs from PubSubIO (example pasted below). They show a "last > reported watermark" however it is not consistent (it is often logged as a min > timestamp until quite a few messages have been consumed) - I assume this is > because there are several threads consuming from PubSub and some threads have > not seen any messages yet? > > In my logs below you can see that one of the threads reported a watermark: > 2017-05-08T11:45:53.798Z last reported watermark - however even though these > watermarks are being reported, my SendWindowToAPI DoFn is not processing any > windows until later. > > > > 05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource - > Pubsub projects/myproject/subscriptions/mysubscription has 1 received > messages, 0 current unread messages, 0 current unread bytes, 0 current > in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max > in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent > extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent > expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0 > recent late messages, -290308-12-21T19:59:05.225Z last reported watermark > > 05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource - > Pubsub projects/myproject/subscriptions/mysubscription has 3 received > messages, 0 current unread messages, 0 current unread bytes, 1 current > in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2 > max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent > extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent > expired, 14093ms recent message timestamp skew, 9224866280808573ms recent > watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last > reported watermark > > 05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource - > Pubsub projects/myproject/subscriptions/mysubscription has 1 received > messages, 0 current unread messages, 0 current unread bytes, 1 current > in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1 > max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent > extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent > expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0 > recent late messages, -290308-12-21T19:59:05.225Z last reported watermark > > On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <[email protected] > <mailto:[email protected]>> wrote: > One suspicion I have is that the watermark could be lacking behind a bit. > Have you looked at that? > >> On 7. May 2017, at 22:44, Josh <[email protected] <mailto:[email protected]>> >> wrote: >> >> Thanks for the replies. >> @Ankur I tried putting a GroupByKey between the Window.into and the sink, >> and it didn't seem to make any difference... >> @Aljoscha I see, that makes sense - so the windowed write code (which uses >> TextIO.write().withWindowedWrites()) is not closing the files as soon as the >> window has ended? >> >> I was trying this out with windowed writes, but what I really want to do >> doesn't involve windowed writes. I am actually trying to do this: >> >> pipeline >> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) >> .apply(ParDo.of(new MapToKV()) >> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) >> .apply(Combine.perKey(new MyCombineFn())) >> .apply(ParDo.of(new SendWindowToAPI())); >> >> So here Combine.perKey will do a GroupByKey and then MyCombineFn will >> aggregate the values for each key. I then want to use another DoFn >> SendWindowToAPI which will ping the aggregate result for each window to a >> REST API. I am trying to hack it this way for now since there is no RestIO >> sink yet. >> >> I'm having the same problem doing this as when running my write windowed >> files example - the SendWindowToAPI DoFn seems to only ping the API after a >> few minutes / 30+ messages have been sent, rather than immediately after >> each window. >> >> Any ideas what's going on here? >> >> Thanks, >> Josh >> >> >> >> >> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected] >> <mailto:[email protected]>> wrote: >> Hi, >> First, a bit of clarification (or refinement): a windowing strategy is used >> in all subsequent GroupByKey operations until another windowing strategy is >> specified. That being said, from quickly glancing at the windowed write-code >> I have the suspicion that triggers are not used for windowed writing and >> that instead some other scheme is used for determining when to close a file. >> >> I’m sure others with more knowledge of those parts will jump in later but I >> nevertheless wanted to give you this quick answer. >> >> Best, >> Aljoscha >> >>> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> That I believe is expected behavior. The windowing strategy is applied at >>> the following group by key operation. To have the windows fire the way you >>> want, try putting a group by key immediately after the desired windowing >>> function. >>> >>> The messages right now are being bundled aggressively for performance >>> reasons and doing a gbk would ensure desired bundle delineations. >>> >>> Ankur Chauhan >>> Sent from my iPhone >>> >>> On May 6, 2017, at 14:11, Josh <[email protected] <mailto:[email protected]>> >>> wrote: >>> >>>> Hi all, >>>> I am using a PubSubIO source, windowing every 10 seconds and then doing >>>> something with the windows, for example: >>>> pipeline >>>> >>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) >>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) >>>> .apply(MapElements >>>> .into(TypeDescriptors.strings()) >>>> .via((PubsubMessage msg) -> msg.getAttribute(..))) >>>> .apply(new WriteOneFilePerWindow(..)); >>>> My expectation was that if I publish a pubsub message, and then publish >>>> another 10+ seconds later, a single file should be written for the >>>> previous 10 second window. However I find that I need to publish a lot of >>>> messages for any files to be written at all (e.g. 30+ messages). >>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it >>>> to fire the windows more eagerly? >>>> Thanks, >>>> Josh >> >> > >
