Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Kenneth Knowles
Reuven's answer will result in a group by key (but not window) where no data is dropped and you get deltas for each key. Downstream consumers can recombine the deltas to get per-key aggregation. So instead of putting the time interval into the window, you put it into the key, and then you get the

Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Reuven Lax
You can definitely group by processing time. The way to do this in Beam is as follows Window.into(new GlobalWindows()) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))

Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Tao Li
Thanks @Kenneth Knowles. I understand we need to specify a window for groupby so that the app knowns when processing is “done” to output result. Is it possible to specify a event arrival/processing time based window for groupby? The purpose is to avoid dropping of late

Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Evan Galpin
Hmm in my somewhat limited experience, I was not able to combine state and Splittable DoFn. Definitely could be user error on my part though. RE sequence numbers, could it work to embed those numbers in the CSV itself? Thanks, Evan On Fri, Apr 23, 2021 at 07:55 Simon Gauld wrote: > Thank you

Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Simon Gauld
Thank you and I will have a look however some concerns I have - the gzip itself is not splittable as such - I need to apply a sequence number 1..n so I believe the read *must* be sequential However what I am looking to achieve is handing off the newly decorated row as soon as the sequence is

Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Evan Galpin
I could be wrong but I believe that if your large file is being read by a DoFn, it’s likely that the file is being processed atomically inside that DoFn, which cannot be parallelized further by the runner. One purpose-built way around that constraint is by using Splittable DoFn[1][2] which could

How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Simon Gauld
Hello, I am trying to apply a transformation to each row in a reasonably large (1b row) gzip compressed CSV. The first operation is to assign a sequence number, in this case 1,2,3.. The second operation is the actual transformation. I would like to apply the sequence number *as* each row is