Just to make sure I understand the problem, your issue is not that files
are not being written once more data is available, but that it takes a long
time from data initially showing up to them being written to files, yes?

The line you linked is responsible for not eagerly updating watermarks -
the PubsubUnboundedSource estimates a watermark based off of the timestamp
of recently received messages; however, we don't want to see a single
element for a low-volume stream and immediately believe that we have all of
the data up to that point, so we wait until we have seen at least ten
elements within a single split before estimating the watermark.

To more aggressively produce output, you can use triggers to fire windows
more eagerly. Where you apply your Window PTransform, you can add
`.triggering(AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(
        AfterProcessingTime.pastFirstElementInPane()

 .plusDelayOf(Duration.standardSeconds(10))))`

(hopefully that renders properly)

This will cause a pane to be produced before the window closes if elements
have been buffered for more than ten seconds. However, do note that you may
get multiple panes within a single window if more data arrives after the
ten second delay of the first element. `WriteOneFilePerWindow` must be
capable of handling this case.

On Tue, May 9, 2017 at 2:59 AM, Josh <[email protected]> wrote:

> I'm just testing this locally at the moment, using the direct runner.
>
> No worries if you can't help! I've been looking at the code and feel like
> maybe this is related to the PubSub source logic - for example here
> https://github.com/apache/beam/blob/master/sdks/
> java/io/google-cloud-platform/src/main/java/org/apache/beam/
> sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L178 there is some hard
> coded configuration which says there must be minimum 10 messages before the
> watermark is updated. I will keep looking into it - hopefully someone who
> understands the PubSub source can give some insight...
>
> Best,
> Josh
>
> On Tue, May 9, 2017 at 10:30 AM, Aljoscha Krettek <[email protected]>
> wrote:
>
>> Hi Josh,
>> What is this running on? I suspect the Dataflow service? In that case I’m
>> afraid I can’t help because I know to little about it.
>>
>> Best,
>> Aljoscha
>>
>> On 8. May 2017, at 14:25, Josh <[email protected]> wrote:
>>
>> Hi Aljoscha, what's the best way to investigate that?
>>
>> I can see the logs from PubSubIO (example pasted below). They show a
>> "last reported watermark" however it is not consistent (it is often logged
>> as a min timestamp until quite a few messages have been consumed) - I
>> assume this is because there are several threads consuming from PubSub and
>> some threads have not seen any messages yet?
>>
>> In my logs below you can see that one of the threads reported a
>> watermark: 2017-05-08T11:45:53.798Z last reported watermark - however
>> even though these watermarks are being reported, my SendWindowToAPI DoFn
>> is not processing any windows until later.
>>
>>
>>
>> 05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
>> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
>> messages, 0 current unread messages, 0 current unread bytes, 0 current
>> in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max
>> in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
>> extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
>> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
>> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>>
>> 05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource
>> - Pubsub projects/myproject/subscriptions/mysubscription has 3 received
>> messages, 0 current unread messages, 0 current unread bytes, 1 current
>> in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2
>> max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent
>> extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent
>> expired, 14093ms recent message timestamp skew, 9224866280808573ms recent
>> watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last
>> reported watermark
>>
>> 05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
>> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
>> messages, 0 current unread messages, 0 current unread bytes, 1 current
>> in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1
>> max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
>> extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
>> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
>> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>>
>> On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <[email protected]>
>> wrote:
>>
>>> One suspicion I have is that the watermark could be lacking behind a
>>> bit. Have you looked at that?
>>>
>>> On 7. May 2017, at 22:44, Josh <[email protected]> wrote:
>>>
>>> Thanks for the replies.
>>> @Ankur I tried putting a GroupByKey between the Window.into and the
>>> sink, and it didn't seem to make any difference...
>>> @Aljoscha I see, that makes sense - so the windowed write code (which
>>> uses TextIO.write().withWindowedWrites()) is not closing the files as
>>> soon as the window has ended?
>>>
>>> I was trying this out with windowed writes, but what I really want to do
>>> doesn't involve windowed writes. I am actually trying to do this:
>>>
>>> pipeline
>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>> .apply(ParDo.of(new MapToKV())
>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>> .apply(Combine.perKey(new MyCombineFn()))
>>> .apply(ParDo.of(new SendWindowToAPI()));
>>>
>>> So here Combine.perKey will do a GroupByKey and then MyCombineFn will
>>> aggregate the values for each key. I then want to use another DoFn
>>> SendWindowToAPI which will ping the aggregate result for each window to a
>>> REST API. I am trying to hack it this way for now since there is no RestIO
>>> sink yet.
>>>
>>> I'm having the same problem doing this as when running my write windowed
>>> files example - the SendWindowToAPI DoFn seems to only ping the API after a
>>> few minutes / 30+ messages have been sent, rather than immediately after
>>> each window.
>>>
>>> Any ideas what's going on here?
>>>
>>> Thanks,
>>> Josh
>>>
>>>
>>>
>>>
>>> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>> First, a bit of clarification (or refinement): a windowing strategy is
>>>> used in all subsequent GroupByKey operations until another windowing
>>>> strategy is specified. That being said, from quickly glancing at the
>>>> windowed write-code I have the suspicion that triggers are not used for
>>>> windowed writing and that instead some other scheme is used for determining
>>>> when to close a file.
>>>>
>>>> I’m sure others with more knowledge of those parts will jump in later
>>>> but I nevertheless wanted to give you this quick answer.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected]> wrote:
>>>>
>>>> That I believe is expected behavior. The windowing strategy is applied
>>>> at the following group by key operation. To have the windows fire the way
>>>> you want, try putting a group by key immediately after the desired
>>>> windowing function.
>>>>
>>>> The messages right now are being bundled aggressively for performance
>>>> reasons and doing a gbk would ensure desired bundle delineations.
>>>>
>>>> Ankur Chauhan
>>>> Sent from my iPhone
>>>>
>>>> On May 6, 2017, at 14:11, Josh <[email protected]> wrote:
>>>>
>>>> Hi all,
>>>> I am using a PubSubIO source, windowing every 10 seconds and then doing
>>>> something with the windows, for example:
>>>>
>>>> pipeline
>>>>     
>>>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>>>     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>>>     .apply(MapElements
>>>>         .into(TypeDescriptors.strings())
>>>>         .via((PubsubMessage msg) -> msg.getAttribute(*..*)))
>>>>     .apply(new WriteOneFilePerWindow(..));
>>>>
>>>> My expectation was that if I publish a pubsub message, and then publish 
>>>> another 10+ seconds later, a single file should be written for the 
>>>> previous 10 second window. However I find that I need to publish a lot of 
>>>> messages for any files to be written at all (e.g. 30+ messages).
>>>>
>>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it 
>>>> to fire the windows more eagerly?
>>>>
>>>> Thanks,
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Reply via email to