Re: Stability of Timer.withOutputTimestamp

2020-02-05 Thread Kenneth Knowles
It is definitely too new to be stable in the sense of not even tiny changes
to the API / runtime compatibility.

However, in my opinion it is so fundamental (and overdue) it will certainly
exist in some form.

Feel free to use it if you are OK with the possibility of minor
compile-time adjustments and you do not require Dataflow pipeline update
compatibility.

Kenn

On Wed, Feb 5, 2020 at 10:31 AM Luke Cwik  wrote:

> +Reuven Lax 
>
> On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz  wrote:
>
>> Also, as a follow up, I'm curious about this commit:
>>
>> https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3
>>
>> My use case is that I want to set a timer to fire after the max timestamp
>> of a window, but hold the watermark to the max timestamp until it fires,
>> essentially delaying the window closing by some amount of event time.
>> Previous to that revert commit it seems like that would have been possible,
>> but now it would fail (since the target is after the window's maxTimestamp).
>>
>> What was the reason this was reverted, and are there plans to un-revert
>> it?
>>
>> On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz 
>> wrote:
>>
>>> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I
>>> didn't see any mention of it in the release notes.
>>>
>>> Is this feature considered stable (specifically on dataflow)?
>>>
>>


Re: Stability of Timer.withOutputTimestamp

2020-02-05 Thread Luke Cwik
+Reuven Lax 

On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz  wrote:

> Also, as a follow up, I'm curious about this commit:
>
> https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3
>
> My use case is that I want to set a timer to fire after the max timestamp
> of a window, but hold the watermark to the max timestamp until it fires,
> essentially delaying the window closing by some amount of event time.
> Previous to that revert commit it seems like that would have been possible,
> but now it would fail (since the target is after the window's maxTimestamp).
>
> What was the reason this was reverted, and are there plans to un-revert it?
>
> On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz  wrote:
>
>> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't
>> see any mention of it in the release notes.
>>
>> Is this feature considered stable (specifically on dataflow)?
>>
>


Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Robert Bradshaw
Yes, you should use BatchElements. Stateful DoFns are not yet
supported for Python Dataflow. (The difference is that
GroupIntoBatches has the capability to batch across bundles, which can
be important for streaming.)



On Wed, Feb 5, 2020 at 7:53 AM Alan Krumholz  wrote:
>
> OK, seems like beam.BatchElements(max_batch_size=x) will do the trick for me 
> and runs fine in DataFlow!
>
> On Wed, Feb 5, 2020 at 7:38 AM Alan Krumholz  
> wrote:
>>
>> Actually beam.GroupIntoBatches() gives me the same error as  
>> beam.util.GroupIntoBatches() :(
>> back to square one.
>>
>> Any other ideas?
>>
>> Thank you!
>>
>>
>> On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz  
>> wrote:
>>>
>>> Never mind there seems to be a  beam.GroupIntoBatches()  that I should have 
>>> originally used instead of beam.util.GroupIntoBatches()
>>>
>>> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz  
>>> wrote:

 Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.

 I get the following error message:

> Exception: Requested execution of a stateful DoFn, but no user state 
> context is available. This likely means that the current runner does not 
> support the execution of stateful DoFns


 Seems to be related to:
 https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow

 Is there another way I can achieve the same using other beam function?

 I basically want to batch rows into groups of 100 as it is a lot faster to 
 transform all at once than doing it 1 by 1.

 I also was planning to use this function for a custom snowflake sink (so I 
 could insert many rows at once)

 I'm sure there must be another way to do this in DataFlow but not sure how?

 Thanks so much!


Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
OK, seems like beam.BatchElements(max_batch_size=x) will do the trick for
me and runs fine in DataFlow!

On Wed, Feb 5, 2020 at 7:38 AM Alan Krumholz 
wrote:

> Actually beam.GroupIntoBatches() gives me the same error as
> beam.util.GroupIntoBatches() :(
> back to square one.
>
> Any other ideas?
>
> Thank you!
>
>
> On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz 
> wrote:
>
>> Never mind there seems to be a  beam.GroupIntoBatches()  that I
>> should have originally used instead of beam.util.GroupIntoBatches()
>>
>> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz 
>> wrote:
>>
>>> Hello, I'm having issues running beam.util.GroupIntoBatches() in
>>> DataFlow.
>>>
>>> I get the following error message:
>>>
>>> Exception: Requested execution of a stateful DoFn, but no user state
 context is available. This likely means that the current runner does not
 support the execution of stateful DoFns
>>>
>>>
>>> Seems to be related to:
>>>
>>> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow
>>>
>>> Is there another way I can achieve the same using other beam function?
>>>
>>> I basically want to batch rows into groups of 100 as it is a lot faster
>>> to transform all at once than doing it 1 by 1.
>>>
>>> I also was planning to use this function for a custom snowflake sink (so
>>> I could insert many rows at once)
>>>
>>> I'm sure there must be another way to do this in DataFlow but not sure
>>> how?
>>>
>>> Thanks so much!
>>>
>>


Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
Actually beam.GroupIntoBatches() gives me the same error as
beam.util.GroupIntoBatches() :(
back to square one.

Any other ideas?

Thank you!


On Wed, Feb 5, 2020 at 7:32 AM Alan Krumholz 
wrote:

> Never mind there seems to be a  beam.GroupIntoBatches()  that I
> should have originally used instead of beam.util.GroupIntoBatches()
>
> On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz 
> wrote:
>
>> Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.
>>
>> I get the following error message:
>>
>> Exception: Requested execution of a stateful DoFn, but no user state
>>> context is available. This likely means that the current runner does not
>>> support the execution of stateful DoFns
>>
>>
>> Seems to be related to:
>>
>> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow
>>
>> Is there another way I can achieve the same using other beam function?
>>
>> I basically want to batch rows into groups of 100 as it is a lot faster
>> to transform all at once than doing it 1 by 1.
>>
>> I also was planning to use this function for a custom snowflake sink (so
>> I could insert many rows at once)
>>
>> I'm sure there must be another way to do this in DataFlow but not sure
>> how?
>>
>> Thanks so much!
>>
>


Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
Never mind there seems to be a  beam.GroupIntoBatches()  that I should have
originally used instead of beam.util.GroupIntoBatches()

On Wed, Feb 5, 2020 at 7:19 AM Alan Krumholz 
wrote:

> Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.
>
> I get the following error message:
>
> Exception: Requested execution of a stateful DoFn, but no user state
>> context is available. This likely means that the current runner does not
>> support the execution of stateful DoFns
>
>
> Seems to be related to:
>
> https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow
>
> Is there another way I can achieve the same using other beam function?
>
> I basically want to batch rows into groups of 100 as it is a lot faster to
> transform all at once than doing it 1 by 1.
>
> I also was planning to use this function for a custom snowflake sink (so I
> could insert many rows at once)
>
> I'm sure there must be another way to do this in DataFlow but not sure how?
>
> Thanks so much!
>


Re: Stability of Timer.withOutputTimestamp

2020-02-05 Thread Steve Niemitz
Also, as a follow up, I'm curious about this commit:
https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3

My use case is that I want to set a timer to fire after the max timestamp
of a window, but hold the watermark to the max timestamp until it fires,
essentially delaying the window closing by some amount of event time.
Previous to that revert commit it seems like that would have been possible,
but now it would fail (since the target is after the window's maxTimestamp).

What was the reason this was reverted, and are there plans to un-revert it?

On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz  wrote:

> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't
> see any mention of it in the release notes.
>
> Is this feature considered stable (specifically on dataflow)?
>


seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Alan Krumholz
Hello, I'm having issues running beam.util.GroupIntoBatches() in DataFlow.

I get the following error message:

Exception: Requested execution of a stateful DoFn, but no user state
> context is available. This likely means that the current runner does not
> support the execution of stateful DoFns


Seems to be related to:
https://stackoverflow.com/questions/56403572/no-userstate-context-is-available-google-cloud-dataflow

Is there another way I can achieve the same using other beam function?

I basically want to batch rows into groups of 100 as it is a lot faster to
transform all at once than doing it 1 by 1.

I also was planning to use this function for a custom snowflake sink (so I
could insert many rows at once)

I'm sure there must be another way to do this in DataFlow but not sure how?

Thanks so much!


Stability of Timer.withOutputTimestamp

2020-02-05 Thread Steve Niemitz
I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't
see any mention of it in the release notes.

Is this feature considered stable (specifically on dataflow)?


Re: Dropping expired sessions with Apache Beam

2020-02-05 Thread Jan Lukavský

Hi Juliana,

I'm not quite familiar with the python SDK, so I can give just a generic 
advise. The problem you describe seems to be handled well via stateful 
dofn [1], where you would hold last timestamp of event per session and 
setup a timer on each incoming event to the expiration time (if the 
timestamp of that event is greater than the greatest see so far). Once 
you receive LOGOUT, you reset this timer and expire the session 
(probably by unsetting the last received event timestamp). Note that the 
events will arrive out-of-order generally (not sorted by timestamp), so 
you must keep the maximal timestamp and update it only with events with 
higher timestamp.


> In normal python I would keep a dict with each session as key and 
last timestamp as value. For each new entry of a given key I would check 
the timedelta. If bigger than window. Expired. Otherwise, update last 
timestamp. But don't know how to handle in beam.


This is essentially what you should do, just use the stateful API.

Hope this helps,

 Jan

[1] https://beam.apache.org/blog/2017/02/13/stateful-processing.html

[2] 
https://beam.apache.org/releases/pydoc/2.18.0/apache_beam.transforms.userstate.html


On 2/5/20 12:58 AM, Juliana Pereira wrote:
I have a log web log file that contains sessions id's and 
interactions, there are three interactions `GET, LOGIN, LOGOUT`. 
Something like:


```
00:00:01;session1;GET
00:00:03;session2;LOGIN
00:01:01;session1;LOGOUT
00:03:01;session2;GET
00:08:15;session2;GET
```

and goes on.

I want to be able to identify (right now, I'm dealing with bounded 
data) with sessions were expired. By expired I mean any session that 
do not have any interaction in a 5 minutes interval.


Of course, if user "LOGOUT", expiration will not be applied. In the 
data above session 2 should be considered expired.


I have the folloing dataflow
```
( p
      | 'Read Files' >> ReadFromText(known_args.input, coder=LogCoder())
      | ParDo(LogToSession())
      | beam.Map(lambda entry: (entry.session, entry))
      | beam.GroupByKey()
)
```

The `LogCoder()` is responsible to correctly read the input files. The 
`LogToSession` convert a log line to a Python class that correctly 
handle the data structure, begin able to acess properties correctly.


For example I can fetch `entry.session` or `entry.timestamp` or 
`entry.operation`.


Once processed by `LogToSession`, `entry.timestamp` is a python 
`datetime`, `entry.session` is a `str` and `entry.operation` is also 
an `str`.


In normal python I would keep a dict with each session as key and last 
timestamp as value. For each new entry of a given key I would check 
the timedelta. If bigger than window. Expired. Otherwise, update last 
timestamp. But don't know how to handle in beam.


How to handle the next steps?