Hi Josh,
What is this running on? I suspect the Dataflow service? In that case I’m 
afraid I can’t help because I know to little about it.

Best,
Aljoscha

> On 8. May 2017, at 14:25, Josh <[email protected]> wrote:
> 
> Hi Aljoscha, what's the best way to investigate that?
> 
> I can see the logs from PubSubIO (example pasted below). They show a "last 
> reported watermark" however it is not consistent (it is often logged as a min 
> timestamp until quite a few messages have been consumed) - I assume this is 
> because there are several threads consuming from PubSub and some threads have 
> not seen any messages yet?
> 
> In my logs below you can see that one of the threads reported a watermark: 
> 2017-05-08T11:45:53.798Z last reported watermark - however even though these 
> watermarks are being reported, my SendWindowToAPI DoFn is not processing any 
> windows until later. 
> 
> 
> 
> 05/08 12:46:55 INFO  [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource - 
> Pubsub projects/myproject/subscriptions/mysubscription has 1 received 
> messages, 0 current unread messages, 0 current unread bytes, 0 current 
> in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max 
> in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent 
> extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent 
> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0 
> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
> 
> 05/08 12:46:56 INFO  [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource - 
> Pubsub projects/myproject/subscriptions/mysubscription has 3 received 
> messages, 0 current unread messages, 0 current unread bytes, 1 current 
> in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2 
> max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent 
> extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent 
> expired, 14093ms recent message timestamp skew, 9224866280808573ms recent 
> watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last 
> reported watermark
> 
> 05/08 12:46:57 INFO  [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource - 
> Pubsub projects/myproject/subscriptions/mysubscription has 1 received 
> messages, 0 current unread messages, 0 current unread bytes, 1 current 
> in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1 
> max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent 
> extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent 
> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0 
> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
> 
> On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> One suspicion I have is that the watermark could be lacking behind a bit. 
> Have you looked at that?
> 
>> On 7. May 2017, at 22:44, Josh <[email protected] <mailto:[email protected]>> 
>> wrote:
>> 
>> Thanks for the replies.
>> @Ankur I tried putting a GroupByKey between the Window.into and the sink, 
>> and it didn't seem to make any difference...
>> @Aljoscha I see, that makes sense - so the windowed write code (which uses 
>> TextIO.write().withWindowedWrites()) is not closing the files as soon as the 
>> window has ended?
>> 
>> I was trying this out with windowed writes, but what I really want to do 
>> doesn't involve windowed writes. I am actually trying to do this:
>> 
>> pipeline
>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>> .apply(ParDo.of(new MapToKV())
>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>> .apply(Combine.perKey(new MyCombineFn()))
>> .apply(ParDo.of(new SendWindowToAPI()));
>> 
>> So here Combine.perKey will do a GroupByKey and then MyCombineFn will 
>> aggregate the values for each key. I then want to use another DoFn 
>> SendWindowToAPI which will ping the aggregate result for each window to a 
>> REST API. I am trying to hack it this way for now since there is no RestIO 
>> sink yet.
>> 
>> I'm having the same problem doing this as when running my write windowed 
>> files example - the SendWindowToAPI DoFn seems to only ping the API after a 
>> few minutes / 30+ messages have been sent, rather than immediately after 
>> each window.
>> 
>> Any ideas what's going on here?
>> 
>> Thanks,
>> Josh
>> 
>> 
>> 
>> 
>> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi,
>> First, a bit of clarification (or refinement): a windowing strategy is used 
>> in all subsequent GroupByKey operations until another windowing strategy is 
>> specified. That being said, from quickly glancing at the windowed write-code 
>> I have the suspicion that triggers are not used for windowed writing and 
>> that instead some other scheme is used for determining when to close a file.
>> 
>> I’m sure others with more knowledge of those parts will jump in later but I 
>> nevertheless wanted to give you this quick answer.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> That I believe is expected behavior. The windowing strategy is applied at 
>>> the following group by key operation. To have the windows fire the way you 
>>> want, try putting a group by key immediately after the desired windowing 
>>> function.
>>> 
>>> The messages right now are being bundled aggressively for performance 
>>> reasons and doing a gbk would ensure desired bundle delineations. 
>>> 
>>> Ankur Chauhan
>>> Sent from my iPhone
>>> 
>>> On May 6, 2017, at 14:11, Josh <[email protected] <mailto:[email protected]>> 
>>> wrote:
>>> 
>>>> Hi all,
>>>> I am using a PubSubIO source, windowing every 10 seconds and then doing 
>>>> something with the windows, for example:
>>>> pipeline
>>>>     
>>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>>>     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>>>     .apply(MapElements
>>>>         .into(TypeDescriptors.strings())
>>>>         .via((PubsubMessage msg) -> msg.getAttribute(..)))
>>>>     .apply(new WriteOneFilePerWindow(..));
>>>> My expectation was that if I publish a pubsub message, and then publish 
>>>> another 10+ seconds later, a single file should be written for the 
>>>> previous 10 second window. However I find that I need to publish a lot of 
>>>> messages for any files to be written at all (e.g. 30+ messages). 
>>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it 
>>>> to fire the windows more eagerly?
>>>> Thanks,
>>>> Josh 
>> 
>> 
> 
> 

Reply via email to