Thanks for the replies.
@Ankur I tried putting a GroupByKey between the Window.into and the sink,
and it didn't seem to make any difference...
@Aljoscha I see, that makes sense - so the windowed write code (which uses
TextIO.write().withWindowedWrites()) is not closing the files as soon as
the window has ended?
I was trying this out with windowed writes, but what I really want to do
doesn't involve windowed writes. I am actually trying to do this:
pipeline
.apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
.apply(ParDo.of(new MapToKV())
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10
.apply(Combine.perKey(new MyCombineFn()))
.apply(ParDo.of(new SendWindowToAPI()));
So here Combine.perKey will do a GroupByKey and then MyCombineFn will
aggregate the values for each key. I then want to use another DoFn
SendWindowToAPI which will ping the aggregate result for each window to a
REST API. I am trying to hack it this way for now since there is no RestIO
sink yet.
I'm having the same problem doing this as when running my write windowed
files example - the SendWindowToAPI DoFn seems to only ping the API after a
few minutes / 30+ messages have been sent, rather than immediately after
each window.
Any ideas what's going on here?
Thanks,
Josh
On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek
wrote:
> Hi,
> First, a bit of clarification (or refinement): a windowing strategy is
> used in all subsequent GroupByKey operations until another windowing
> strategy is specified. That being said, from quickly glancing at the
> windowed write-code I have the suspicion that triggers are not used for
> windowed writing and that instead some other scheme is used for determining
> when to close a file.
>
> I’m sure others with more knowledge of those parts will jump in later but
> I nevertheless wanted to give you this quick answer.
>
> Best,
> Aljoscha
>
> On 7. May 2017, at 00:57, Ankur Chauhan wrote:
>
> That I believe is expected behavior. The windowing strategy is applied at
> the following group by key operation. To have the windows fire the way you
> want, try putting a group by key immediately after the desired windowing
> function.
>
> The messages right now are being bundled aggressively for performance
> reasons and doing a gbk would ensure desired bundle delineations.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 6, 2017, at 14:11, Josh wrote:
>
> Hi all,
> I am using a PubSubIO source, windowing every 10 seconds and then doing
> something with the windows, for example:
>
> pipeline
> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10
> .apply(MapElements
> .into(TypeDescriptors.strings())
> .via((PubsubMessage msg) -> msg.getAttribute(*..*)))
> .apply(new WriteOneFilePerWindow(..));
>
> My expectation was that if I publish a pubsub message, and then publish
> another 10+ seconds later, a single file should be written for the previous
> 10 second window. However I find that I need to publish a lot of messages for
> any files to be written at all (e.g. 30+ messages).
>
> Is this expected behaviour when using PubSubIO? Is there a way to tweak it to
> fire the windows more eagerly?
>
> Thanks,
>
> Josh
>
>
>