Hi Thomas,

Yes that was the problem and your solution solves it. Thanks for
explaining! I think I was confused by the Pubsub source sometimes logging a
"last reported watermark", even though apparently the watermark hadn't been
updated and propagated to the window function.

Thanks,
Josh

On Wed, May 10, 2017 at 11:39 PM, Thomas Groh <[email protected]> wrote:

> Just to make sure I understand the problem, your issue is not that files
> are not being written once more data is available, but that it takes a long
> time from data initially showing up to them being written to files, yes?
>
> The line you linked is responsible for not eagerly updating watermarks -
> the PubsubUnboundedSource estimates a watermark based off of the timestamp
> of recently received messages; however, we don't want to see a single
> element for a low-volume stream and immediately believe that we have all of
> the data up to that point, so we wait until we have seen at least ten
> elements within a single split before estimating the watermark.
>
> To more aggressively produce output, you can use triggers to fire windows
> more eagerly. Where you apply your Window PTransform, you can add
> `.triggering(AfterWatermark.pastEndOfWindow()
>     .withEarlyFirings(
>         AfterProcessingTime.pastFirstElementInPane()
>                                          .plusDelayOf(Duration.
> standardSeconds(10))))`
>
> (hopefully that renders properly)
>
> This will cause a pane to be produced before the window closes if elements
> have been buffered for more than ten seconds. However, do note that you may
> get multiple panes within a single window if more data arrives after the
> ten second delay of the first element. `WriteOneFilePerWindow` must be
> capable of handling this case.
>
> On Tue, May 9, 2017 at 2:59 AM, Josh <[email protected]> wrote:
>
>> I'm just testing this locally at the moment, using the direct runner.
>>
>> No worries if you can't help! I've been looking at the code and feel like
>> maybe this is related to the PubSub source logic - for example here
>> https://github.com/apache/beam/blob/master/sdks/java/
>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> io/gcp/pubsub/PubsubUnboundedSource.java#L178 there is some hard coded
>> configuration which says there must be minimum 10 messages before the
>> watermark is updated. I will keep looking into it - hopefully someone who
>> understands the PubSub source can give some insight...
>>
>> Best,
>> Josh
>>
>> On Tue, May 9, 2017 at 10:30 AM, Aljoscha Krettek <[email protected]>
>> wrote:
>>
>>> Hi Josh,
>>> What is this running on? I suspect the Dataflow service? In that case
>>> I’m afraid I can’t help because I know to little about it.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 8. May 2017, at 14:25, Josh <[email protected]> wrote:
>>>
>>> Hi Aljoscha, what's the best way to investigate that?
>>>
>>> I can see the logs from PubSubIO (example pasted below). They show a
>>> "last reported watermark" however it is not consistent (it is often logged
>>> as a min timestamp until quite a few messages have been consumed) - I
>>> assume this is because there are several threads consuming from PubSub and
>>> some threads have not seen any messages yet?
>>>
>>> In my logs below you can see that one of the threads reported a
>>> watermark: 2017-05-08T11:45:53.798Z last reported watermark - however
>>> even though these watermarks are being reported, my SendWindowToAPI
>>> DoFn is not processing any windows until later.
>>>
>>>
>>>
>>> 05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
>>> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
>>> messages, 0 current unread messages, 0 current unread bytes, 0 current
>>> in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max
>>> in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
>>> extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
>>> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
>>> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>>>
>>> 05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource
>>> - Pubsub projects/myproject/subscriptions/mysubscription has 3 received
>>> messages, 0 current unread messages, 0 current unread bytes, 1 current
>>> in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2
>>> max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent
>>> extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent
>>> expired, 14093ms recent message timestamp skew, 9224866280808573ms recent
>>> watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last
>>> reported watermark
>>>
>>> 05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
>>> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
>>> messages, 0 current unread messages, 0 current unread bytes, 1 current
>>> in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1
>>> max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
>>> extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
>>> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
>>> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>>>
>>> On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <[email protected]>
>>> wrote:
>>>
>>>> One suspicion I have is that the watermark could be lacking behind a
>>>> bit. Have you looked at that?
>>>>
>>>> On 7. May 2017, at 22:44, Josh <[email protected]> wrote:
>>>>
>>>> Thanks for the replies.
>>>> @Ankur I tried putting a GroupByKey between the Window.into and the
>>>> sink, and it didn't seem to make any difference...
>>>> @Aljoscha I see, that makes sense - so the windowed write code (which
>>>> uses TextIO.write().withWindowedWrites()) is not closing the files as
>>>> soon as the window has ended?
>>>>
>>>> I was trying this out with windowed writes, but what I really want to
>>>> do doesn't involve windowed writes. I am actually trying to do this:
>>>>
>>>> pipeline
>>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubsc
>>>> ription(..))
>>>> .apply(ParDo.of(new MapToKV())
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>>> .apply(Combine.perKey(new MyCombineFn()))
>>>> .apply(ParDo.of(new SendWindowToAPI()));
>>>>
>>>> So here Combine.perKey will do a GroupByKey and then MyCombineFn will
>>>> aggregate the values for each key. I then want to use another DoFn
>>>> SendWindowToAPI which will ping the aggregate result for each window to a
>>>> REST API. I am trying to hack it this way for now since there is no RestIO
>>>> sink yet.
>>>>
>>>> I'm having the same problem doing this as when running my write
>>>> windowed files example - the SendWindowToAPI DoFn seems to only ping the
>>>> API after a few minutes / 30+ messages have been sent, rather than
>>>> immediately after each window.
>>>>
>>>> Any ideas what's going on here?
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> First, a bit of clarification (or refinement): a windowing strategy is
>>>>> used in all subsequent GroupByKey operations until another windowing
>>>>> strategy is specified. That being said, from quickly glancing at the
>>>>> windowed write-code I have the suspicion that triggers are not used for
>>>>> windowed writing and that instead some other scheme is used for 
>>>>> determining
>>>>> when to close a file.
>>>>>
>>>>> I’m sure others with more knowledge of those parts will jump in later
>>>>> but I nevertheless wanted to give you this quick answer.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected]> wrote:
>>>>>
>>>>> That I believe is expected behavior. The windowing strategy is applied
>>>>> at the following group by key operation. To have the windows fire the way
>>>>> you want, try putting a group by key immediately after the desired
>>>>> windowing function.
>>>>>
>>>>> The messages right now are being bundled aggressively for performance
>>>>> reasons and doing a gbk would ensure desired bundle delineations.
>>>>>
>>>>> Ankur Chauhan
>>>>> Sent from my iPhone
>>>>>
>>>>> On May 6, 2017, at 14:11, Josh <[email protected]> wrote:
>>>>>
>>>>> Hi all,
>>>>> I am using a PubSubIO source, windowing every 10 seconds and then
>>>>> doing something with the windows, for example:
>>>>>
>>>>> pipeline
>>>>>     
>>>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>>>>     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>>>>     .apply(MapElements
>>>>>         .into(TypeDescriptors.strings())
>>>>>         .via((PubsubMessage msg) -> msg.getAttribute(*..*)))
>>>>>     .apply(new WriteOneFilePerWindow(..));
>>>>>
>>>>> My expectation was that if I publish a pubsub message, and then publish 
>>>>> another 10+ seconds later, a single file should be written for the 
>>>>> previous 10 second window. However I find that I need to publish a lot of 
>>>>> messages for any files to be written at all (e.g. 30+ messages).
>>>>>
>>>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak 
>>>>> it to fire the windows more eagerly?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to