Re: Stability of Timer.withOutputTimestamp
It is definitely too new to be stable in the sense of not even tiny changes to the API / runtime compatibility. However, in my opinion it is so fundamental (and overdue) it will certainly exist in some form. Feel free to use it if you are OK with the possibility of minor compile-time adjustments and you do not require Dataflow pipeline update compatibility. Kenn On Wed, Feb 5, 2020 at 10:31 AM Luke Cwik wrote: > +Reuven Lax > > On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz wrote: > >> Also, as a follow up, I'm curious about this commit: >> >> https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3 >> >> My use case is that I want to set a timer to fire after the max timestamp >> of a window, but hold the watermark to the max timestamp until it fires, >> essentially delaying the window closing by some amount of event time. >> Previous to that revert commit it seems like that would have been possible, >> but now it would fail (since the target is after the window's maxTimestamp). >> >> What was the reason this was reverted, and are there plans to un-revert >> it? >> >> On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz >> wrote: >> >>> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I >>> didn't see any mention of it in the release notes. >>> >>> Is this feature considered stable (specifically on dataflow)? >>> >>
Re: Stability of Timer.withOutputTimestamp
+Reuven Lax On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz wrote: > Also, as a follow up, I'm curious about this commit: > > https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3 > > My use case is that I want to set a timer to fire after the max timestamp > of a window, but hold the watermark to the max timestamp until it fires, > essentially delaying the window closing by some amount of event time. > Previous to that revert commit it seems like that would have been possible, > but now it would fail (since the target is after the window's maxTimestamp). > > What was the reason this was reverted, and are there plans to un-revert it? > > On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz wrote: > >> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't >> see any mention of it in the release notes. >> >> Is this feature considered stable (specifically on dataflow)? >> >
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Yes, you should use BatchElements. Stateful DoFns are not yet supported for Python Dataflow. (The difference is that GroupIntoBatches has the capability to batch across bundles, which can be important for streaming.) On Wed, Feb 5, 2020 at 7:53 AM Alan Krumholz wrote: > > OK, seems like beam.BatchElements(max_batch_size=x) will do the trick for me > and runs fine in DataFlow! > > On Wed, Feb 5, 2020 at 7:38 AM Alan Krumholz > wrote: >> >> Actually beam.GroupIntoBatches() gives me the same error as >> beam.util.GroupIntoBatches() :( >> back to square one. >> >> Any other ideas? >> >> Thank you! >> >> >> On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz >> wrote: >>> >>> Never mind there seems to be a beam.GroupIntoBatches() that I should have >>> originally used instead of beam.util.GroupIntoBatches() >>> >>> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz >>> wrote: Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. I get the following error message: > Exception: Requested execution of a stateful DoFn, but no user state > context is available. This likely means that the current runner does not > support the execution of stateful DoFns Seems to be related to: https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow Is there another way I can achieve the same using other beam function? I basically want to batch rows into groups of 100 as it is a lot faster to transform all at once than doing it 1 by 1. I also was planning to use this function for a custom snowflake sink (so I could insert many rows at once) I'm sure there must be another way to do this in DataFlow but not sure how? Thanks so much!
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
OK, seems like beam.BatchElements(max_batch_size=x) will do the trick for me and runs fine in DataFlow! On Wed, Feb 5, 2020 at 7:38 AM Alan Krumholz wrote: > Actually beam.GroupIntoBatches() gives me the same error as > beam.util.GroupIntoBatches() :( > back to square one. > > Any other ideas? > > Thank you! > > > On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz > wrote: > >> Never mind there seems to be a beam.GroupIntoBatches() that I >> should have originally used instead of beam.util.GroupIntoBatches() >> >> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz >> wrote: >> >>> Hello, I'm having issues running beam.util.GroupIntoBatches() in >>> DataFlow. >>> >>> I get the following error message: >>> >>> Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns >>> >>> >>> Seems to be related to: >>> >>> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow >>> >>> Is there another way I can achieve the same using other beam function? >>> >>> I basically want to batch rows into groups of 100 as it is a lot faster >>> to transform all at once than doing it 1 by 1. >>> >>> I also was planning to use this function for a custom snowflake sink (so >>> I could insert many rows at once) >>> >>> I'm sure there must be another way to do this in DataFlow but not sure >>> how? >>> >>> Thanks so much! >>> >>
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Actually beam.GroupIntoBatches() gives me the same error as beam.util.GroupIntoBatches() :( back to square one. Any other ideas? Thank you! On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz wrote: > Never mind there seems to be a beam.GroupIntoBatches() that I > should have originally used instead of beam.util.GroupIntoBatches() > > On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz > wrote: > >> Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. >> >> I get the following error message: >> >> Exception: Requested execution of a stateful DoFn, but no user state >>> context is available. This likely means that the current runner does not >>> support the execution of stateful DoFns >> >> >> Seems to be related to: >> >> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow >> >> Is there another way I can achieve the same using other beam function? >> >> I basically want to batch rows into groups of 100 as it is a lot faster >> to transform all at once than doing it 1 by 1. >> >> I also was planning to use this function for a custom snowflake sink (so >> I could insert many rows at once) >> >> I'm sure there must be another way to do this in DataFlow but not sure >> how? >> >> Thanks so much! >> >
Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Never mind there seems to be a beam.GroupIntoBatches() that I should have originally used instead of beam.util.GroupIntoBatches() On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz wrote: > Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. > > I get the following error message: > > Exception: Requested execution of a stateful DoFn, but no user state >> context is available. This likely means that the current runner does not >> support the execution of stateful DoFns > > > Seems to be related to: > > https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow > > Is there another way I can achieve the same using other beam function? > > I basically want to batch rows into groups of 100 as it is a lot faster to > transform all at once than doing it 1 by 1. > > I also was planning to use this function for a custom snowflake sink (so I > could insert many rows at once) > > I'm sure there must be another way to do this in DataFlow but not sure how? > > Thanks so much! >
Re: Stability of Timer.withOutputTimestamp
Also, as a follow up, I'm curious about this commit: https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3 My use case is that I want to set a timer to fire after the max timestamp of a window, but hold the watermark to the max timestamp until it fires, essentially delaying the window closing by some amount of event time. Previous to that revert commit it seems like that would have been possible, but now it would fail (since the target is after the window's maxTimestamp). What was the reason this was reverted, and are there plans to un-revert it? On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz wrote: > I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't > see any mention of it in the release notes. > > Is this feature considered stable (specifically on dataflow)? >
seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?
Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow. I get the following error message: Exception: Requested execution of a stateful DoFn, but no user state > context is available. This likely means that the current runner does not > support the execution of stateful DoFns Seems to be related to: https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow Is there another way I can achieve the same using other beam function? I basically want to batch rows into groups of 100 as it is a lot faster to transform all at once than doing it 1 by 1. I also was planning to use this function for a custom snowflake sink (so I could insert many rows at once) I'm sure there must be another way to do this in DataFlow but not sure how? Thanks so much!
Stability of Timer.withOutputTimestamp
I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't see any mention of it in the release notes. Is this feature considered stable (specifically on dataflow)?
Re: Dropping expired sessions with Apache Beam
Hi Juliana, I'm not quite familiar with the python SDK, so I can give just a generic advise. The problem you describe seems to be handled well via stateful dofn [1], where you would hold last timestamp of event per session and setup a timer on each incoming event to the expiration time (if the timestamp of that event is greater than the greatest see so far). Once you receive LOGOUT, you reset this timer and expire the session (probably by unsetting the last received event timestamp). Note that the events will arrive out-of-order generally (not sorted by timestamp), so you must keep the maximal timestamp and update it only with events with higher timestamp. > In normal python I would keep a dict with each session as key and last timestamp as value. For each new entry of a given key I would check the timedelta. If bigger than window. Expired. Otherwise, update last timestamp. But don't know how to handle in beam. This is essentially what you should do, just use the stateful API. Hope this helps, Jan [1] https://beam.apache.org/blog/2017/02/13/stateful-processing.html [2] https://beam.apache.org/releases/pydoc/2.18.0/apache_beam.transforms.userstate.html On 2/5/20 12:58 AM, Juliana Pereira wrote: I have a log web log file that contains sessions id's and interactions, there are three interactions `GET, LOGIN, LOGOUT`. Something like: ``` 00:00:01;session1;GET 00:00:03;session2;LOGIN 00:01:01;session1;LOGOUT 00:03:01;session2;GET 00:08:15;session2;GET ``` and goes on. I want to be able to identify (right now, I'm dealing with bounded data) with sessions were expired. By expired I mean any session that do not have any interaction in a 5 minutes interval. Of course, if user "LOGOUT", expiration will not be applied. In the data above session 2 should be considered expired. I have the folloing dataflow ``` ( p | 'Read Files' >> ReadFromText(known_args.input, coder=LogCoder()) | ParDo(LogToSession()) | beam.Map(lambda entry: (entry.session, entry)) | beam.GroupByKey() ) ``` The `LogCoder()` is responsible to correctly read the input files. The `LogToSession` convert a log line to a Python class that correctly handle the data structure, begin able to acess properties correctly. For example I can fetch `entry.session` or `entry.timestamp` or `entry.operation`. Once processed by `LogToSession`, `entry.timestamp` is a python `datetime`, `entry.session` is a `str` and `entry.operation` is also an `str`. In normal python I would keep a dict with each session as key and last timestamp as value. For each new entry of a given key I would check the timedelta. If bigger than window. Expired. Otherwise, update last timestamp. But don't know how to handle in beam. How to handle the next steps?