Thanks folks. This is really informative!

From: Kenneth Knowles <k...@apache.org>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Friday, April 23, 2021 at 9:34 AM
To: Reuven Lax <re...@google.com>
Cc: user <user@beam.apache.org>, Kenneth Knowles <k...@apache.org>, Kelly Smith 
<kell...@zillowgroup.com>, Lian Jiang <li...@zillowgroup.com>
Subject: Re: Question on late data handling in Beam streaming mode

Reuven's answer will result in a group by key (but not window) where no data is 
dropped and you get deltas for each key. Downstream consumers can recombine the 
deltas to get per-key aggregation. So instead of putting the time interval into 
the window, you put it into the key, and then you get the same grouped 
aggregation.

There are (at least) two other ways to do this:

1. You can set allowed lateness to a high value.
2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to 
arrival time. I illustrated this in some older talks [2].

Kenn

[1] 
https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fdc636be57900c8ad9b6b9e50b08dad64be8aee40%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Ftransforms%2FDoFn.java%23L184&data=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683482682%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=vs9%2FRa%2B8ya5%2FIWxtMUa8KUuRvEH4vUbfyDAr%2BbJN3IM%3D&reserved=0>
[2] 
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fpresentation%2Fd%2F1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI%2Fpresent%3Fslide%3Did.g142c2fd96f_0_134&data=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683492644%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=W6p9rfGk9vLqEW3p%2FlTN2c9Jbf%2B1qacEzu4wX36OVoE%3D&reserved=0>

On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax 
<re...@google.com<mailto:re...@google.com>> wrote:
You can definitely group by processing time. The way to do this in Beam is as 
follows

Window.into<T>(new GlobalWindows())
    .triggering(AfterWatermark.pastEndOfWindow() 
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
    .discardingFiredPanes());

The syntax is a bit unfortunately wordy, but the idea is that you are creating 
a single event-time window that encompasses all time, and "triggering" an 
aggregation every 30 seconds based on processing time.

On Fri, Apr 23, 2021 at 8:14 AM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Thanks @Kenneth Knowles<mailto:k...@apache.org>. I understand we need to 
specify a window for groupby so that the app knowns when processing is “done” 
to output result.

Is it possible to specify a event arrival/processing time based window for 
groupby? The purpose is to avoid dropping of late events. With a event 
processing time based window, the app will periodically output the result based 
on all events that arrived in that window, and a late arriving event will fall 
into whatever window covers its arrival time and thus that late data will not 
get lost.

Does Beam support this kind of mechanism? Thanks.

From: Kenneth Knowles <k...@apache.org<mailto:k...@apache.org>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, April 22, 2021 at 1:49 PM
To: user <user@beam.apache.org<mailto:user@beam.apache.org>>
Cc: Kelly Smith <kell...@zillowgroup.com<mailto:kell...@zillowgroup.com>>, Lian 
Jiang <li...@zillowgroup.com<mailto:li...@zillowgroup.com>>
Subject: Re: Question on late data handling in Beam streaming mode

Hello!

In a streaming app, you have two choices: wait forever and never have any 
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark. When 
the watermark predicts no more data for an aggregation, then the aggregation is 
done. For example GROUP BY <minute> is "done" when no more data will arrive for 
that minute. At this point, your result is produced. More data may arrive, and 
it is ignored. The watermark is determined by the IO connector to be the best 
heuristic available. You can configure "allowed lateness" for an aggregation to 
allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Hi Beam community,

I am wondering if there is a risk of losing late data from a Beam stream app 
due to watermarking?

I just went through this design doc and noticed the “droppable” definition 
there: 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23&data=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683492644%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Gerl6evMEdU3tzhsnc0yTR6bDNOXe8CVRwwJjhlwb6c%3D&reserved=0>

Can you please confirm if it’s possible for us to lose some data in a stream 
app in practice? If that’s possible, what would be the best practice to avoid 
data loss? Thanks!

Reply via email to