Hi Aljoscha, what's the best way to investigate that?

I can see the logs from PubSubIO (example pasted below). They show a "last
reported watermark" however it is not consistent (it is often logged as a
min timestamp until quite a few messages have been consumed) - I assume
this is because there are several threads consuming from PubSub and some
threads have not seen any messages yet?

In my logs below you can see that one of the threads reported a
watermark: 2017-05-08T11:45:53.798Z
last reported watermark - however even though these watermarks are being
reported, my SendWindowToAPI DoFn is not processing any windows until
later.



05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource -
Pubsub projects/myproject/subscriptions/mysubscription has 1 received
messages, 0 current unread messages, 0 current unread bytes, 0 current
in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max
in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
recent late messages, -290308-12-21T19:59:05.225Z last reported watermark


05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource -
Pubsub projects/myproject/subscriptions/mysubscription has 3 received
messages, 0 current unread messages, 0 current unread bytes, 1 current
in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2
max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent
extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent
expired, 14093ms recent message timestamp skew, 9224866280808573ms recent
watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last
reported watermark


05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource -
Pubsub projects/myproject/subscriptions/mysubscription has 1 received
messages, 0 current unread messages, 0 current unread bytes, 1 current
in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1
max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
recent late messages, -290308-12-21T19:59:05.225Z last reported watermark

On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> One suspicion I have is that the watermark could be lacking behind a bit.
> Have you looked at that?
>
> On 7. May 2017, at 22:44, Josh <jof...@gmail.com> wrote:
>
> Thanks for the replies.
> @Ankur I tried putting a GroupByKey between the Window.into and the sink,
> and it didn't seem to make any difference...
> @Aljoscha I see, that makes sense - so the windowed write code (which uses
> TextIO.write().withWindowedWrites()) is not closing the files as soon as
> the window has ended?
>
> I was trying this out with windowed writes, but what I really want to do
> doesn't involve windowed writes. I am actually trying to do this:
>
> pipeline
> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
> .apply(ParDo.of(new MapToKV())
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply(Combine.perKey(new MyCombineFn()))
> .apply(ParDo.of(new SendWindowToAPI()));
>
> So here Combine.perKey will do a GroupByKey and then MyCombineFn will
> aggregate the values for each key. I then want to use another DoFn
> SendWindowToAPI which will ping the aggregate result for each window to a
> REST API. I am trying to hack it this way for now since there is no RestIO
> sink yet.
>
> I'm having the same problem doing this as when running my write windowed
> files example - the SendWindowToAPI DoFn seems to only ping the API after a
> few minutes / 30+ messages have been sent, rather than immediately after
> each window.
>
> Any ideas what's going on here?
>
> Thanks,
> Josh
>
>
>
>
> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> First, a bit of clarification (or refinement): a windowing strategy is
>> used in all subsequent GroupByKey operations until another windowing
>> strategy is specified. That being said, from quickly glancing at the
>> windowed write-code I have the suspicion that triggers are not used for
>> windowed writing and that instead some other scheme is used for determining
>> when to close a file.
>>
>> I’m sure others with more knowledge of those parts will jump in later but
>> I nevertheless wanted to give you this quick answer.
>>
>> Best,
>> Aljoscha
>>
>> On 7. May 2017, at 00:57, Ankur Chauhan <an...@malloc64.com> wrote:
>>
>> That I believe is expected behavior. The windowing strategy is applied at
>> the following group by key operation. To have the windows fire the way you
>> want, try putting a group by key immediately after the desired windowing
>> function.
>>
>> The messages right now are being bundled aggressively for performance
>> reasons and doing a gbk would ensure desired bundle delineations.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 6, 2017, at 14:11, Josh <jof...@gmail.com> wrote:
>>
>> Hi all,
>> I am using a PubSubIO source, windowing every 10 seconds and then doing
>> something with the windows, for example:
>>
>> pipeline
>>     .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>     .apply(MapElements
>>         .into(TypeDescriptors.strings())
>>         .via((PubsubMessage msg) -> msg.getAttribute(*..*)))
>>     .apply(new WriteOneFilePerWindow(..));
>>
>> My expectation was that if I publish a pubsub message, and then publish 
>> another 10+ seconds later, a single file should be written for the previous 
>> 10 second window. However I find that I need to publish a lot of messages 
>> for any files to be written at all (e.g. 30+ messages).
>>
>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it 
>> to fire the windows more eagerly?
>>
>> Thanks,
>>
>> Josh
>>
>>
>>
>
>

Reply via email to