I'm just testing this locally at the moment, using the direct runner.

No worries if you can't help! I've been looking at the code and feel like
maybe this is related to the PubSub source logic - for example here
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L178
there is some hard coded configuration which says there must be minimum 10
messages before the watermark is updated. I will keep looking into it -
hopefully someone who understands the PubSub source can give some insight...

Best,
Josh

On Tue, May 9, 2017 at 10:30 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> What is this running on? I suspect the Dataflow service? In that case I’m
> afraid I can’t help because I know to little about it.
>
> Best,
> Aljoscha
>
> On 8. May 2017, at 14:25, Josh <jof...@gmail.com> wrote:
>
> Hi Aljoscha, what's the best way to investigate that?
>
> I can see the logs from PubSubIO (example pasted below). They show a "last
> reported watermark" however it is not consistent (it is often logged as a
> min timestamp until quite a few messages have been consumed) - I assume
> this is because there are several threads consuming from PubSub and some
> threads have not seen any messages yet?
>
> In my logs below you can see that one of the threads reported a watermark:
> 2017-05-08T11:45:53.798Z last reported watermark - however even though
> these watermarks are being reported, my SendWindowToAPI DoFn is not
> processing any windows until later.
>
>
>
> 05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
> messages, 0 current unread messages, 0 current unread bytes, 0 current
> in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max
> in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
> extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>
> 05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource
> - Pubsub projects/myproject/subscriptions/mysubscription has 3 received
> messages, 0 current unread messages, 0 current unread bytes, 1 current
> in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2
> max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent
> extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent
> expired, 14093ms recent message timestamp skew, 9224866280808573ms recent
> watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last
> reported watermark
>
> 05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
> messages, 0 current unread messages, 0 current unread bytes, 1 current
> in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1
> max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
> extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>
> On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> One suspicion I have is that the watermark could be lacking behind a bit.
>> Have you looked at that?
>>
>> On 7. May 2017, at 22:44, Josh <jof...@gmail.com> wrote:
>>
>> Thanks for the replies.
>> @Ankur I tried putting a GroupByKey between the Window.into and the sink,
>> and it didn't seem to make any difference...
>> @Aljoscha I see, that makes sense - so the windowed write code (which
>> uses TextIO.write().withWindowedWrites()) is not closing the files as
>> soon as the window has ended?
>>
>> I was trying this out with windowed writes, but what I really want to do
>> doesn't involve windowed writes. I am actually trying to do this:
>>
>> pipeline
>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>> .apply(ParDo.of(new MapToKV())
>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>> .apply(Combine.perKey(new MyCombineFn()))
>> .apply(ParDo.of(new SendWindowToAPI()));
>>
>> So here Combine.perKey will do a GroupByKey and then MyCombineFn will
>> aggregate the values for each key. I then want to use another DoFn
>> SendWindowToAPI which will ping the aggregate result for each window to a
>> REST API. I am trying to hack it this way for now since there is no RestIO
>> sink yet.
>>
>> I'm having the same problem doing this as when running my write windowed
>> files example - the SendWindowToAPI DoFn seems to only ping the API after a
>> few minutes / 30+ messages have been sent, rather than immediately after
>> each window.
>>
>> Any ideas what's going on here?
>>
>> Thanks,
>> Josh
>>
>>
>>
>>
>> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> First, a bit of clarification (or refinement): a windowing strategy is
>>> used in all subsequent GroupByKey operations until another windowing
>>> strategy is specified. That being said, from quickly glancing at the
>>> windowed write-code I have the suspicion that triggers are not used for
>>> windowed writing and that instead some other scheme is used for determining
>>> when to close a file.
>>>
>>> I’m sure others with more knowledge of those parts will jump in later
>>> but I nevertheless wanted to give you this quick answer.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 7. May 2017, at 00:57, Ankur Chauhan <an...@malloc64.com> wrote:
>>>
>>> That I believe is expected behavior. The windowing strategy is applied
>>> at the following group by key operation. To have the windows fire the way
>>> you want, try putting a group by key immediately after the desired
>>> windowing function.
>>>
>>> The messages right now are being bundled aggressively for performance
>>> reasons and doing a gbk would ensure desired bundle delineations.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 6, 2017, at 14:11, Josh <jof...@gmail.com> wrote:
>>>
>>> Hi all,
>>> I am using a PubSubIO source, windowing every 10 seconds and then doing
>>> something with the windows, for example:
>>>
>>> pipeline
>>>     .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>>     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>>     .apply(MapElements
>>>         .into(TypeDescriptors.strings())
>>>         .via((PubsubMessage msg) -> msg.getAttribute(*..*)))
>>>     .apply(new WriteOneFilePerWindow(..));
>>>
>>> My expectation was that if I publish a pubsub message, and then publish 
>>> another 10+ seconds later, a single file should be written for the previous 
>>> 10 second window. However I find that I need to publish a lot of messages 
>>> for any files to be written at all (e.g. 30+ messages).
>>>
>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it 
>>> to fire the windows more eagerly?
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>>
>>
>>
>
>

Reply via email to