Re: getting windowparam in python

2021-02-24 Thread Manninger, Matyas
That must have been the reason. I removed the extra parameter and now it
works. Thanks for the help.

On Wed, 24 Feb 2021 at 02:54, Yichi Zhang  wrote:

> It seems that your DoFn is expecting a side input, could you verify that
> you are actually feeding the side input to your DoFn like
> `beam.ParDo(DebugWindowInformation(), 'extra_info') `, I suspect that
> missing side input of your DoFn has messed up the argument translation.
>
> On Tue, Feb 23, 2021 at 5:29 PM Ahmet Altay  wrote:
>
>> /cc +Yichi Zhang 
>>
>> On Fri, Feb 19, 2021 at 2:24 AM Manninger, Matyas <
>> matyas.mannin...@veolia.com> wrote:
>>
>>> Dear Beam users,
>>>
>>> I am using the following code to log debug info about my streaming
>>> pipeline:
>>>
>>> class DebugWindowInformation(beam.DoFn):
>>> def to_runner_api_parameter(self, unused_context):
>>> pass
>>>
>>> def process(self, data_item, extra='',
>>> timest=beam.DoFn.TimestampParam, windowparam=beam.DoFn.WindowParam, *args):
>>> import logging
>>> # GCP does NOT log debug on ROOT level
>>> print(type(windowparam))
>>> print(windowparam)
>>> logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
>>> {windowparam.start}-{windowparam.end} message: {extra} {data_item}')
>>> #logging.info(f'[{datetime.datetime.now()}]
>>> [{timest.to_utc_datetime()}] window:
>>> {windowparam.start.to_utc_datetime()}-{windowparam.end.to_utc_datetime()}
>>> message: {extra} {data_item}')
>>> yield data_item
>>>
>>> Unfortunately I get the following error:
>>> 
>>> WindowParam
>>> logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
>>> {windowparam.start}-{windowparam.end} message: {extra} {data_item}')
>>> AttributeError: '_DoFnParam' object has no attribute 'start'
>>>
>>> The code is taken from examples. Anyone has and idea what might cause
>>> the error?
>>>
>>> Any tip is appreciated,
>>> Matyas
>>>
>>>
>>>


Re: Reshuffle stuck in python API

2021-02-23 Thread Manninger, Matyas
Yes, apparently it might have been an error related to the Dataflow runner,
Google is investigating the case. They gave a few suggestions that solved
the issue but I am not sure which was the real solution and why.

On Wed, 24 Feb 2021 at 03:11, Yichi Zhang  wrote:

> Reshuffle doesn't change your windowing or grouping, it simply
> redistributes the elements to different workers. The output should match
> the input of the Reshuffle step. Are you seeing fewer elements coming out
> of Reshuffle comparing to the input?
>
> On Wed, Feb 17, 2021 at 9:11 AM Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> Dear Beam users,
>>
>> I have a problem running a python pipeline in Dataflow. Because of many
>> side inputs and a complicated architecture Google told us that their
>> optimization algorithm gets messed up and adding reshuffle to the pipeline
>> solves the issue. Unfortunately, it seems like the Reshuffle step is not
>> working properly. I added a 60 sec fixed window in front of it as this is a
>> streaming pipeline. It seems like elements get added to the step but they
>> remain grouped or something like that as there are only a very few elements
>> coming out of the step. Any ideas what I might be doing wrong? The code is
>> very long and complicated, I also wouldn't share it, but are there any
>> typical mistakes regarding the reshuffling?
>>
>> Thanks for any tips,
>> Matyas
>>
>


getting windowparam in python

2021-02-19 Thread Manninger, Matyas
Dear Beam users,

I am using the following code to log debug info about my streaming pipeline:

class DebugWindowInformation(beam.DoFn):
def to_runner_api_parameter(self, unused_context):
pass

def process(self, data_item, extra='', timest=beam.DoFn.TimestampParam,
windowparam=beam.DoFn.WindowParam, *args):
import logging
# GCP does NOT log debug on ROOT level
print(type(windowparam))
print(windowparam)
logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
{windowparam.start}-{windowparam.end} message: {extra} {data_item}')
#logging.info(f'[{datetime.datetime.now()}]
[{timest.to_utc_datetime()}] window:
{windowparam.start.to_utc_datetime()}-{windowparam.end.to_utc_datetime()}
message: {extra} {data_item}')
yield data_item

Unfortunately I get the following error:

WindowParam
logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
{windowparam.start}-{windowparam.end} message: {extra} {data_item}')
AttributeError: '_DoFnParam' object has no attribute 'start'

The code is taken from examples. Anyone has and idea what might cause the
error?

Any tip is appreciated,
Matyas


Reshuffle stuck in python API

2021-02-17 Thread Manninger, Matyas
Dear Beam users,

I have a problem running a python pipeline in Dataflow. Because of many
side inputs and a complicated architecture Google told us that their
optimization algorithm gets messed up and adding reshuffle to the pipeline
solves the issue. Unfortunately, it seems like the Reshuffle step is not
working properly. I added a 60 sec fixed window in front of it as this is a
streaming pipeline. It seems like elements get added to the step but they
remain grouped or something like that as there are only a very few elements
coming out of the step. Any ideas what I might be doing wrong? The code is
very long and complicated, I also wouldn't share it, but are there any
typical mistakes regarding the reshuffling?

Thanks for any tips,
Matyas


Re: Is there an array explode function/transform?

2021-01-13 Thread Manninger, Matyas
I would also not unnest arrays nested in arrays just the top-level array of
the specified fields.

On Wed, 13 Jan 2021 at 20:58, Reuven Lax  wrote:

> Nested fields are not part of standard SQL AFAIK. Beam goes further and
> supports array of array, etc.
>
> On Wed, Jan 13, 2021 at 11:42 AM Kenneth Knowles  wrote:
>
>> Just the fields specified, IMO. When in doubt, copy SQL. (and I mean SQL
>> generally, not just Beam SQL)
>>
>> Kenn
>>
>> On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:
>>
>>> Definitely could be a top-level transform. Should it automatically
>>> unnest all arrays, or just the fields specified?
>>>
>>> We do have to define the semantics for nested arrays as well.
>>>
>>> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
>>> wrote:
>>>
 Ah, thanks for the clarification. UNNEST does sound like what you want
 here, and would likely make sense as a top-level relational transform as
 well as being supported by SQL.

 On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:

> @Kyle Weaver  sure thing! So the input/output
> definition for the Flatten.Iterables
> 
> is:
>
>
>
> Input: PCollection
>
> Output: PCollection
>
>
>
> The input/output for a explode transform would look like this:
>
> Input:  PCollection The row schema has a field which is an array
> of T
>
> Output: PCollection The array type field from input schema is
> replaced with a new field of type T. The elements from the array type 
> field
> are flattened into multiple rows in the new table (other fields of input
> table are just duplicated.
>
>
>
> Hope this clarification helps!
>
>
>
> *From: *Kyle Weaver 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, January 12, 2021 at 4:58 PM
> *To: *"user@beam.apache.org" 
> *Cc: *Reuven Lax 
> *Subject: *Re: Is there an array explode function/transform?
>
>
>
> @Reuven Lax  yes I am aware of that transform, but
> that’s different from the explode operation I was referring to:
> https://spark.apache.org/docs/latest/api/sql/index.html#explode
> 
>
>
>
> How is it different? It'd help if you could provide the signature
> (input and output PCollection types) of the transform you have in mind.
>
>
>
> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>
> @Reuven Lax  yes I am aware of that transform, but
> that’s different from the explode operation I was referring to:
> https://spark.apache.org/docs/latest/api/sql/index.html#explode
> 
>
>
>
> *From: *Reuven Lax 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, January 12, 2021 at 2:04 PM
> *To: *user 
> *Subject: *Re: Is there an array explode function/transform?
>
>
>
> Have you tried Flatten.iterables
>
>
>
> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>
> Hi community,
>
>
>
> Is there a beam function to explode an array (similarly to spark sql’s
> explode())? I did some research but did not find anything.
>
>
>
> BTW I think we can potentially use FlatMap to implement the explode
> functionality, but a Beam provided function would be very handy.
>
>
>
> Thanks a lot!
>
>


Re: Submit bug report

2021-01-12 Thread Manninger, Matyas
Thanks Alexey

On Tue, 12 Jan 2021 at 12:31, Alexey Romanenko 
wrote:

> Hi Matyas,
>
> If you believe it’s a bug, please, submit an issue on Beam Jira [1], then
> create a PR (if you have a fix for that) on Beam Github repository [2] and
> ask someone, who mostly contributed to that part of code, to do a PR
> review.
>
> You may want to find more details about a Beam contribution in this
> Contribution Guide [3]
>
> Regards,
> Alexey
>
> [1] https://issues.apache.org/jira/projects/BEAM/issues
> [2] https://github.com/apache/beam
> [3] https://beam.apache.org/contribute/
>
> > On 12 Jan 2021, at 09:17, Manninger, Matyas 
> wrote:
> >
> > Dear All,
> >
> > What is the best way to submit a bug report? I can't seem to do it on
> github. I have written several emails how the periodic sequence doesn't
> work in python but all those emails were ignored here. It would be a very
> easy fix.
> >
> > Thanks for any answer
>
>


Submit bug report

2021-01-12 Thread Manninger, Matyas
Dear All,

What is the best way to submit a bug report? I can't seem to do it on
github. I have written several emails how the periodic sequence doesn't
work in python but all those emails were ignored here. It would be a very
easy fix.

Thanks for any answer


Re: Side input in streaming

2021-01-08 Thread Manninger, Matyas
Dear Kenn,

Thanks again, that pattern was my initial plan but there seems to be a bug
in the python API in the periodicsequence.py on line 42 "total_outputs =
math.ceil((end - start) / interval)". Here end start and interval are all
Durations and the / operator is not defined for the Duration class. I
already wrote an email about this to this list but I didn't get a
satisfactory answer on how to go around this issue so now I am trying to go
around using PeriodicImpulse. If you have any other suggestions I would
highly appreciate it.

On Thu, 7 Jan 2021 at 18:29, Kenneth Knowles  wrote:

> Actually, if you want to actually re-read the BQ table then you need
> something more, following the pattern here:
> https://beam.apache.org/documentation/patterns/side-inputs/. There are
> two variations on the page there, and these do not use triggers but instead
> the read from BigQuery at the beginning of the 24 hours is used by all of
> the main input elements for the whole 24 hour window of the main input. The
> general pattern is PeriodImpulse --> ParDo(convert impulse to read spec)
> --> ReadAll
>
> I realize you did not specify what language you are using. The ReadAll
> transform only exists for BigQuery in Python right now, and it is not yet
> easy to use it from Java (plus you may not want to).
>
> Kenn
>
> On Thu, Jan 7, 2021 at 12:36 AM Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> Thanks Kenn for the clear explanation. Very helpful. I am trying to read
>> a small BQ table as side input and refresh it every 24 hours or so but I
>> still want to main stream to be processed during that time. Is there a
>> better way to do this than have a 24 hour window with 1 minute triggers on
>> the side input? Maybe just restarting the job every 24 hour and reading the
>> side input on setup would be the best option.
>>
>> On Tue, 5 Jan 2021 at 17:53, Kenneth Knowles  wrote:
>>
>>> You have it basically right. However, there are a couple minor
>>> clarifications:
>>>
>>> 1. A particular window on the side input is not "ready" until there has
>>> been some element output to it (or it has expired, which will make it the
>>> default value). Main input elements will wait for the side input to be
>>> ready. If you configure triggering on the side input, then the first
>>> triggering will make it "ready". Of course, this means that the value you
>>> will read will be incomplete view of the data. If you have a 24 hour window
>>> with triggering set up then the value that is read will be whatever the
>>> most recent trigger is, but with some caching delay.
>>> 2. None of the "time" that you are talking about is real time. It is all
>>> event time so it is controlled by the side input and main input watermarks.
>>> Of course in streaming these are usually close to real time so yes on
>>> average what you describe is probably right.
>>>
>>> It sounds like you want a side input with a trigger on it, if you want
>>> to read it before you have all the data. This is highly nondeterministic so
>>> you want to be sure that you do not require exact answers on the side input.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 5, 2021 at 6:56 AM Manninger, Matyas <
>>> matyas.mannin...@veolia.com> wrote:
>>>
>>>> Dear Beam users,
>>>>
>>>> Can someone clarify me how side input works in streaming? If I use a
>>>> stream as a side input to my main stream, each element will be paired with
>>>> a side input from the according time window. does this mean that the
>>>> element will not be processed until the appropriate window on the side
>>>> input stream is closed? So if my side input is windowed into 24 hour
>>>> windows will my elements from the main stream be processed only every 24
>>>> hour? If not, then if the window is triggered for the sideinput at 12:00
>>>> and the input actually only arrives at 12:05 then all elements from the
>>>> main stream processed between 12:00 and 12:05 will be matched with an empty
>>>> sideinput?
>>>>
>>>> Any clarification is appreciated.
>>>>
>>>> Best regards,
>>>> Matyas
>>>>
>>>


Re: Side input in streaming

2021-01-07 Thread Manninger, Matyas
Thanks Kenn for the clear explanation. Very helpful. I am trying to read a
small BQ table as side input and refresh it every 24 hours or so but I
still want to main stream to be processed during that time. Is there a
better way to do this than have a 24 hour window with 1 minute triggers on
the side input? Maybe just restarting the job every 24 hour and reading the
side input on setup would be the best option.

On Tue, 5 Jan 2021 at 17:53, Kenneth Knowles  wrote:

> You have it basically right. However, there are a couple minor
> clarifications:
>
> 1. A particular window on the side input is not "ready" until there has
> been some element output to it (or it has expired, which will make it the
> default value). Main input elements will wait for the side input to be
> ready. If you configure triggering on the side input, then the first
> triggering will make it "ready". Of course, this means that the value you
> will read will be incomplete view of the data. If you have a 24 hour window
> with triggering set up then the value that is read will be whatever the
> most recent trigger is, but with some caching delay.
> 2. None of the "time" that you are talking about is real time. It is all
> event time so it is controlled by the side input and main input watermarks.
> Of course in streaming these are usually close to real time so yes on
> average what you describe is probably right.
>
> It sounds like you want a side input with a trigger on it, if you want to
> read it before you have all the data. This is highly nondeterministic so
> you want to be sure that you do not require exact answers on the side input.
>
> Kenn
>
> On Tue, Jan 5, 2021 at 6:56 AM Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> Dear Beam users,
>>
>> Can someone clarify me how side input works in streaming? If I use a
>> stream as a side input to my main stream, each element will be paired with
>> a side input from the according time window. does this mean that the
>> element will not be processed until the appropriate window on the side
>> input stream is closed? So if my side input is windowed into 24 hour
>> windows will my elements from the main stream be processed only every 24
>> hour? If not, then if the window is triggered for the sideinput at 12:00
>> and the input actually only arrives at 12:05 then all elements from the
>> main stream processed between 12:00 and 12:05 will be matched with an empty
>> sideinput?
>>
>> Any clarification is appreciated.
>>
>> Best regards,
>> Matyas
>>
>


Side input in streaming

2021-01-05 Thread Manninger, Matyas
Dear Beam users,

Can someone clarify me how side input works in streaming? If I use a stream
as a side input to my main stream, each element will be paired with a side
input from the according time window. does this mean that the element will
not be processed until the appropriate window on the side input stream is
closed? So if my side input is windowed into 24 hour windows will my
elements from the main stream be processed only every 24 hour? If not, then
if the window is triggered for the sideinput at 12:00 and the input
actually only arrives at 12:05 then all elements from the main stream
processed between 12:00 and 12:05 will be matched with an empty sideinput?

Any clarification is appreciated.

Best regards,
Matyas


Re: Session window ad sideinput

2021-01-04 Thread Manninger, Matyas
Dear Reza,

Thanks for the suggestion, that is the solution I was going for but
unfortunately PeriodicImpulse has some bugs. I also posted a question about
that in this mail list but no success there so far so I am looking for
alternatives.

On Tue, 22 Dec 2020 at 12:36, Reza Ardeshir Rokni  wrote:

> Hi,
>
> Why the need for session windows? Could you make use of a Global Window
> for the side input, as per the following pattern:
>
> https://beam.apache.org/documentation/patterns/side-inputs/
>
> Cheers
> Reza
>
>
>
> On Tue, 22 Dec 2020 at 01:17, Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> Dear Beam users,
>>
>> I am writing a streaming pipeline that has static tables as side inputs.
>> These tables change from time to time and I want the side inputs to be
>> updated at some intervals. I am planning on triggering the update by
>> sending a message through pubsub. When a new message arrives, the side
>> input should be updated. I would like to do this with session windows but
>> on the beam documentation page session windows are depicted as lasting from
>> the first input in the window to the last input in the window and the gap
>> seems to not belong to any window. So if I would use this as a side
>> input how would my main stream be matched to windows? If I send a signal
>> every day and the gap is set to 1 hour, for example, would the window close
>> after 1 hour and for the next 23 hours all the elements in the main
>> stream would be matched to no side input?
>>
>> Thanks for any help or tips on how to solve this or what is the expected
>> behaviour.
>>
>> BR,
>> Matyas Manninger
>>
>


Re: periodic impulse bug

2020-12-21 Thread Manninger, Matyas
Hey Ning,

Thanks for the answer. Shouldn't the start and end be timestamps? If I set
them to ints manually what is now for start? I assume max int would be end
if I never want it to end?

On Mon, 21 Dec 2020 at 18:45, Ning Kang  wrote:

> I also ran into this issue some time ago. Couldn't figure out why, but
> explicitly setting the end and start to some integer value when building
> the `PeriodicImpulse` transform could be a workaround.
>
> On Mon, Dec 21, 2020 at 4:19 AM Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> Dear Beam users,
>>
>> In the python SDK I tried using the PeriodicImpulse but seems like there
>> is an internal bug. In the periodicsequence.py on line 42 there is a
>> division where a type Duration is being divided, but no division operation
>> is defined. What am I missing? Is there a workaround to this? Here is the
>> error message that lead to this "discovery":
>> File
>> "/home/user/.virtualenvs/dflowenv/lib/python3.8/site-packages/apache_beam/transforms/periodicsequence.py",
>> line 42, in initial_restriction
>>total_outputs = math.ceil((end - start) / interval)
>> TypeError: unsupported operand type(s) for /: 'Duration' and 'Duration'
>> [while running 'read/periodic_impulse/GenSequence/PairWithRestriction']
>>
>>


Session window ad sideinput

2020-12-21 Thread Manninger, Matyas
Dear Beam users,

I am writing a streaming pipeline that has static tables as side inputs.
These tables change from time to time and I want the side inputs to be
updated at some intervals. I am planning on triggering the update by
sending a message through pubsub. When a new message arrives, the side
input should be updated. I would like to do this with session windows but
on the beam documentation page session windows are depicted as lasting from
the first input in the window to the last input in the window and the gap
seems to not belong to any window. So if I would use this as a side
input how would my main stream be matched to windows? If I send a signal
every day and the gap is set to 1 hour, for example, would the window close
after 1 hour and for the next 23 hours all the elements in the main
stream would be matched to no side input?

Thanks for any help or tips on how to solve this or what is the expected
behaviour.

BR,
Matyas Manninger


periodic impulse bug

2020-12-21 Thread Manninger, Matyas
Dear Beam users,

In the python SDK I tried using the PeriodicImpulse but seems like there is
an internal bug. In the periodicsequence.py on line 42 there is a division
where a type Duration is being divided, but no division operation is
defined. What am I missing? Is there a workaround to this? Here is the
error message that lead to this "discovery":
File
"/home/user/.virtualenvs/dflowenv/lib/python3.8/site-packages/apache_beam/transforms/periodicsequence.py",
line 42, in initial_restriction
   total_outputs = math.ceil((end - start) / interval)
TypeError: unsupported operand type(s) for /: 'Duration' and 'Duration'
[while running 'read/periodic_impulse/GenSequence/PairWithRestriction']


error with periodic impulse

2020-12-17 Thread Manninger, Matyas
Dear Beam users,

I am trying to write a pipeline that would use a BigQuery table as
sideinput, but I want beam to refresh the content in memory periodically.
For this, I am using the recommended periodic impulse and then a read from
bigquery.
I have defined the following class for this:

class BigQuerySideinput(beam.PTransform):
>   def __init__(self, query, refresh_interval):
> super(BigQuerySideinput, self).__init__()
> self.query = query
> self.refresh_interval = refresh_interval
>   def expand(self, pcoll):
> return ( pcoll
>| 'periodic_impulse' >>
> beam.transforms.periodicsequence.PeriodicImpulse(fire_interval=self.refresh_interval,
> apply_windowing=True)
>| 'inject_query' >> beam.Map(lambda x: self.query)
>| 'run_query' >> beam.ParDo(BigQueryRead()))


Here BigQueryRead() is a DoFn that takes the query and reads the data. When
I try to run a pipeline with this step in it I get the following error:

> RuntimeError: Transform node
> AppliedPTransform(EmployeeLookup/Employee:Collect/periodic_impulse/GenSequence/Proc
> essKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced
> as expected.

Could anyone help with any idea what might cause the issue and how to fix
it? Also if there is a better way to get the same behavior I am also up for
those suggestions. Thanks in advance. (If more details are needed I would
be happy to provide them.)

Best regards,
Matyas Manninger