Re: Beam discarding massive amount of events due to Window object or inner processing

2019-10-16 Thread Tim Sell
I think everyone who followed this thread learned something! I know I did.

Thanks for asking these questions. The summary and code snippets were just
the right length to be accessible and focussed.

On Wed, 16 Oct 2019, 06:04 Eddy G,  wrote:

> Thank you so so much guys for the amazing feedback you're giving me!
>
> I'm applying all of it and deep diving into more detail and see where I
> could also go from there so I can still get the pipeline performance way
> better.
>
> Again, really appreciated guys, you are amazing.
>


Re: Beam discarding massive amount of events due to Window object or inner processing

2019-10-14 Thread Tim Sell
You're getting 1 shard per pane, and you get a pane every time it's
triggered on an early firing. And then another one in the final on-time
pane. To have 1 file with 1 shard for every 15 minute window you need to
only fire on window close. Ie AfterWatermark.pastendofwindow, without early
firing.

On Mon, 14 Oct 2019, 14:35 Eddy G,  wrote:

> Thanks a lot everyone for your so valuable feedback!
>
> Just updated my code, made some minor refactoring and seems to be working
> like a charm. Still some data being dropped due to lateness (but I'm
> talking about 100 elements per 2 million, so no "big deal" there, I will
> take a look into extending lateness and overall performance bits that I'm
> missing out).
>
> A thing that worries me a lot is that the wall time has been exponentially
> increasing up to 1 day and 3 hours in the stage that is in charge of
> writing all that captured data into parquet files, supposedly due to
> .parquet file writing code.
>
> I suppose that this is also the reason why I still get tons of small
> parquet files within a same bucket, as I should only have, in a perfect
> scenario, 4 files (1 each 15 minutes due to the Window object length), when
> I'm currently having +60!
>
> .apply("Write .parquet File(s)",
> FileIO
> .writeDynamic()
> .by((SerializableFunction)
> event -> {
> // specify partitioning here
> })
> .via(ParquetIO.sink(AVRO_SCHEMA))
> .to(options.getOutputDirectory())
> .withNaming(type -> ParquetFileNaming.getNaming(...))
> .withDestinationCoder(StringUtf8Coder.of())
> .withNumShards(1) // should this be 0? Could this
> imply increasing of costs if set to 0?
>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-18 Thread Tim Sell
+1, I'd love to see this as a recording. Will you stick it up on youtube
afterwards?

On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
wrote:

> Thanks, Pablo! Looking forward to it! Hopefully, it will also be recorded
> as well.
>
> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada  wrote:
>
>> Yes! So I will be working on a small feature request for Java's
>> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>>
>> Maybe I'll do something for Python next month. : )
>> Best
>> -P.
>>
>> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
>> wrote:
>>
>>> +1, I really appreciate this initiative. It would be really helpful
>>> newbies like me.
>>>
>>> Is it possible to list out what are the things that you are planning to
>>> cover?
>>>
>>>
>>>
>>>
>>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang  wrote:
>>>
 Thanks for organizing this Pablo, it'll be very helpful!

 On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada 
 wrote:

> Hello all,
> I'll be having a session where I live-fix a Beam bug for 1 hour next
> week. Everyone is invited.
>
> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I will
> finish a full change in that time frame, but we'll see.
>
> I have not yet decided if I will do this via hangouts, or via a
> youtube livestream. In any case, I will share the link here in the next 
> few
> days.
>
> I will most likely work on the Java SDK (I have a little feature
> request in mind).
>
> Thanks!
> -P.
>



Re: PubSubIO watermark not advancing for low volumes

2019-05-15 Thread Tim Sell
Thanks!

I made a jira
https://issues.apache.org/jira/browse/BEAM-7322

And dumped my sample code here:
https://github.com/tims/beam/tree/master/pubsub-watermark

*From: *Alexey Romanenko 
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: * 

Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
>
> On 13 May 2019, at 20:52, Kenneth Knowles  wrote:
>
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
>
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
>
> Kenn
>
> *From: *Tim Sell 
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: * 
>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>>
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>>
>> *From: *Kenneth Knowles 
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * , dev
>>
>> Nice analysis & details!
>>>
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>
>>>  - store 60 seconds of data
>>>  - update data every 5 seconds
>>>  - require at least 10 messages to be 'significant'
>>>  - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>>
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>>
>>> Have you filed an issue in Jira [2]?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>>
>>> *From: *Tim Sell 
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: * 
>>>
>>> Hello,
>>>>
>>>> I have identified an issue where the watermark does not advance when
>>>> using the beam PubSubIO when volumes are very low.
>>>>
>>>> The behaviour is easily replicated if you apply a fixed window
>>>> triggering after the watermark passes the end of the window.
>>>>
>>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>> .apply(ParDo.of(new ParseScoreEventFn()))
>>>> 
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow())
>>>> .withAllowedLateness(Duration.standardSeconds(60))
>>>> .discardingFiredPanes())
>>>> .apply(MapElements.into(kvs(strings(), integers()))
>>>> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
>>>> scoreEvent.getScore(
>>>> .apply(Count.perKey())
>>>> .apply(ParDo.of(Log.of("counted per key")));
>>>>
>>>> With this triggering, using both the flink local runner the direct
>>>> runner, *no panes will ever be emitted* if the volume of messages in
>>>> pubsub is very low. eg 1 per second.
>>>>
>>>> If I change the triggering to have early firings I get exactly the
>>>> emitted panes that you would expect.
>>>>
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow()
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>> .alignedTo(Duration.standardSeconds(60
>>>> .withAllowedLateness(Duration.standardSeconds(60))
>>>> .discardingFiredPanes())
>&g