Re: SQL massively more resource-intensive? Memory leak?

2019-06-03 Thread deklanw
Yes, it works but I think it has the same problem. It's a lot slower so it took 
me hours of running it, but by the end the memory usage was high and the CPU 
about 100% so it seems to be the same problem.

Worth noting perhaps that when I use the DirectRunner I have to turn 
enforceImmutability off because of 
https://issues.apache.org/jira/browse/BEAM-1714

On 2019/06/03 17:48:30, Rui Wang  wrote: 
> Ha sorry I was only reading screenshots but ignored your other comments. So
> count fn indeed worked.
> 
> Can I ask if your sql pipeline works on direct runner?
> 
> 
> -Rui
> 
> On Mon, Jun 3, 2019 at 10:39 AM Rui Wang  wrote:
> 
> > BeamSQL actually only converts SELECT COUNT(*) query to the Java pipeline
> > that calls Java's builtin Count[1] transform.
> >
> > Could you implement your pipeline by Count transform to see whether this
> > memory issue still exists? By doing so we could narrow down problem a bit.
> > If using Java directly without going through SQL code path and it works, we
> > will know that BeamSQL does not generate a working pipeline for yoru SELECT
> > COUNT(*) query.
> >
> >
> > [1]:
> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54
> >
> > On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com 
> > wrote:
> >
> >> Is using Beam SQL just massively more resource-intensive or is there a
> >> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner)
> >>
> >> Here is my code:
> >> https://pastebin.com/nNtc9ZaG
> >>
> >> Here is the error I get (truncated at the end because it's so long and
> >> seemingly repetitive) when I run the SQL transform and my memory/CPU usage
> >> skyrockets:
> >> https://pastebin.com/mywmkCQi
> >>
> >> For example,
> >>
> >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM,
> >> everything working fine:
> >>
> >> `rowStream.apply(Combine.globally(Count.combineFn()).withoutDefaults()).apply(myPrint());`
> >>
> >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+
> >> RAM, soon crashes:
> >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
> >> PCOLLECTION")).apply(myPrint());`
> >>
> >> I can't imagine this is expected behavior but maybe I'm just ignorant of
> >> how SQL is implemented.
> >>
> >> Most of this code is just getting Schema Registry Avro Kafka messages
> >> into a Row stream. There have been discussions on the mailing list recently
> >> about how to do that. This is the best I could do. If that part is
> >> incorrect I'd be glad to know.
> >>
> >> Any help appreciated. Thank you.
> >>
> >>
> 


Re: send Avro messages to Kafka using Beam

2019-06-03 Thread Yohei Onishi
I think that might be possible too.

Yohei Onishi


On Tue, Jun 4, 2019 at 9:00 AM Chamikara Jayalath 
wrote:

>
>
> On Mon, Jun 3, 2019 at 5:14 PM Yohei Onishi  wrote:
>
>> Hi Nicolas,
>>
>> Are you running you job on Dataflow? According to GCP support, Dataflow
>> currently does not support Schema Registry. But we can still use Schema
>> Registry to serialize / deserialize your message using
>> custom KafkaAvroSerializer.
>>
>> In my case I implemented my custom KafkaAvroDeserializer to deserialize
>> Avro format message.
>> You can implement custom KafkaAvroSerializer in a similar way.
>>
>> https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam/55917157#55917157
>>
>> I hope this helps you.
>>
>> Yohei Onishi
>>
>>
>> On Wed, May 29, 2019 at 5:44 PM Nicolas Delsaux 
>> wrote:
>>
>>> Hello all
>>>
>>> I have a beam job that I use to read messages from RabbitMq t write them
>>> in kafka.
>>>
>>> As of now, messages are read/written as JSON.
>>>
>>> Obviously, it's not that optimal storage, so i would like to transform
>>> the messages to avro prior to write them in Kafka. I have the URL of a
>>> schema registry I can use to store/get my schema.
>>>
>>
> Can you simply have a ParDo that sits in between read and write steps and
> perform the conversion (and also connect to schema registry) ?
>
>
>>
>>> But I see nowhere in Beam documentation how to transform my JSON into
>>> Avro data (except by deserializing my JSON to a java class that i will
>>> later transform into avro). Is that deserialization to class the only
>>> way ? or is it possible to generate an avro generic record from my json
>>> "directly" ?
>>>
>>> Once my avro data is generated, how can I write it to my Kafka topic ?
>>>
>>>
>>> Thanks !
>>>
>>>


Re: send Avro messages to Kafka using Beam

2019-06-03 Thread Chamikara Jayalath
On Mon, Jun 3, 2019 at 5:14 PM Yohei Onishi  wrote:

> Hi Nicolas,
>
> Are you running you job on Dataflow? According to GCP support, Dataflow
> currently does not support Schema Registry. But we can still use Schema
> Registry to serialize / deserialize your message using
> custom KafkaAvroSerializer.
>
> In my case I implemented my custom KafkaAvroDeserializer to deserialize
> Avro format message.
> You can implement custom KafkaAvroSerializer in a similar way.
>
> https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam/55917157#55917157
>
> I hope this helps you.
>
> Yohei Onishi
>
>
> On Wed, May 29, 2019 at 5:44 PM Nicolas Delsaux 
> wrote:
>
>> Hello all
>>
>> I have a beam job that I use to read messages from RabbitMq t write them
>> in kafka.
>>
>> As of now, messages are read/written as JSON.
>>
>> Obviously, it's not that optimal storage, so i would like to transform
>> the messages to avro prior to write them in Kafka. I have the URL of a
>> schema registry I can use to store/get my schema.
>>
>
Can you simply have a ParDo that sits in between read and write steps and
perform the conversion (and also connect to schema registry) ?


>
>> But I see nowhere in Beam documentation how to transform my JSON into
>> Avro data (except by deserializing my JSON to a java class that i will
>> later transform into avro). Is that deserialization to class the only
>> way ? or is it possible to generate an avro generic record from my json
>> "directly" ?
>>
>> Once my avro data is generated, how can I write it to my Kafka topic ?
>>
>>
>> Thanks !
>>
>>


Re: send Avro messages to Kafka using Beam

2019-06-03 Thread Yohei Onishi
Hi Nicolas,

Are you running you job on Dataflow? According to GCP support, Dataflow
currently does not support Schema Registry. But we can still use Schema
Registry to serialize / deserialize your message using
custom KafkaAvroSerializer.

In my case I implemented my custom KafkaAvroDeserializer to deserialize
Avro format message.
You can implement custom KafkaAvroSerializer in a similar way.
https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam/55917157#55917157

I hope this helps you.

Yohei Onishi


On Wed, May 29, 2019 at 5:44 PM Nicolas Delsaux 
wrote:

> Hello all
>
> I have a beam job that I use to read messages from RabbitMq t write them
> in kafka.
>
> As of now, messages are read/written as JSON.
>
> Obviously, it's not that optimal storage, so i would like to transform
> the messages to avro prior to write them in Kafka. I have the URL of a
> schema registry I can use to store/get my schema.
>
> But I see nowhere in Beam documentation how to transform my JSON into
> Avro data (except by deserializing my JSON to a java class that i will
> later transform into avro). Is that deserialization to class the only
> way ? or is it possible to generate an avro generic record from my json
> "directly" ?
>
> Once my avro data is generated, how can I write it to my Kafka topic ?
>
>
> Thanks !
>
>


Re: How to build a beam python pipeline which does GET/POST request to API's

2019-06-03 Thread Ankur Goenka
By looking at your usecase, the whole processing logic seems to be very
custom.
I would recommend using ParDo's to express your use case. If the processing
for individual dictionary is expensive then you can potentially use a
reshuffle operation to distribute the updation of dictionary over multiple
workers.

Note: As you are going to make write API calls your self, in case of worker
failure, your transform can be executed multiple times.

On Mon, Jun 3, 2019 at 11:41 AM Anjana Pydi 
wrote:

> Hi Ankur,
>
> Thanks for reply. Please find responses updated in below mail.
>
> Thanks,
> Anjana
> --
> *From:* Ankur Goenka [goe...@google.com]
> *Sent:* Monday, June 03, 2019 11:01 AM
> *To:* user@beam.apache.org
> *Subject:* Re: How to build a beam python pipeline which does GET/POST
> request to API's
>
> Thanks for providing more information.
>
> Some follow up questions/comments
> 1. Call an API which would provide a dictionary as response.
> Question: Do you need to make multiple of these API calls? If yes, what
> distinguishes API call1 from call2? If its the input to the API, then can
> you provide the inputs to in a file etc? What I am trying to identify is an
> input source to the pipeline so that beam can distribute the work.
> Answer : When an API call is made, it can provide a list of dictionaries
> as response, we have to go through every dictionary, do the same
> transformations for each and send it.
> 2. Transform dictionary to add / remove few keys.
> 3. Send transformed dictionary as JSON to an API which prints this JSON as
> output.
> Question: Are these write operation idempotent? As you are doing your own
> api calls, its possible that after a failure, the calls are done again for
> the same input. If write calls are not idempotent then their can be
> duplicate data.
> Answer : Suppose, if I receive a list of 1000 dictionaries as response
> when I called API in point1, I should do only 1000 write operations
> respectively to each input. If there is a failure for any input, only that
> should not be posted and remaining should be posted successfully.
>
> On Sat, Jun 1, 2019 at 8:13 PM Anjana Pydi 
> wrote:
>
>> Hi Ankur,
>>
>> Thanks for the reply! Below is more details of the usecase:
>>
>> 1. Call an API which would provide a dictionary as response.
>> 2. Transform dictionary to add / remove few keys.
>> 3. Send transformed dictionary as JSON to an API which prints this JSON
>> as output.
>>
>> Please let me know in case of any clarifications.
>>
>> Thanks,
>> Anjana
>> --
>> *From:* Ankur Goenka [goe...@google.com]
>> *Sent:* Saturday, June 01, 2019 6:47 PM
>> *To:* user@beam.apache.org
>> *Subject:* Re: How to build a beam python pipeline which does GET/POST
>> request to API's
>>
>> Hi Anjana,
>>
>> You can write your API logic in a ParDo and subsequently pass the
>> elements to other ParDos to transform and eventually make an API call to to
>> another endpoint.
>>
>> However, this might not be a good fit for Beam as the input is not well
>> defined and hence scaling and "once processing" of elements will not be
>> possible as their is no well defined input.
>>
>> It will be better to elaborate a bit more on the usecase for better
>> suggestions.
>>
>> Thanks,
>> Ankur
>>
>> On Sat, Jun 1, 2019 at 5:50 PM Anjana Pydi 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a requirement to create an apache beam python pipeline to read a
>>> JSON from an API endpoint, transform it (add/remove few fields)and send the
>>> transformed JSON to another API endpoint.
>>>
>>> Can anyone please provide some suggestions on how to do it.
>>>
>>> Thanks,
>>> Anjana
>>> ---
>>> The information contained in this communication is intended solely for the
>>> use of the individual or entity to whom it is addressed and others
>>> authorized to receive it. It may contain confidential or legally privileged
>>> information. If you are not the intended recipient you are hereby notified
>>> that any disclosure, copying, distribution or taking any action in reliance
>>> on the contents of this information is strictly prohibited and may be
>>> unlawful. If you are not the intended recipient, please notify us
>>> immediately by responding to this email and then delete it from your
>>> system. Bahwan Cybertek is neither liable for the proper and complete
>>> transmission of the information contained in this communication nor for any
>>> delay in its receipt.
>>>
>> ---
>> The information contained in this communication is intended solely for the
>> use of the individual or entity to whom it is addressed and others
>> authorized to receive it. It may contain confidential or legally privileged
>> information. If you are not the intended recipient you are h

RE: How to build a beam python pipeline which does GET/POST request to API's

2019-06-03 Thread Anjana Pydi
Hi Ankur,

Thanks for reply. Please find responses updated in below mail.

Thanks,
Anjana

From: Ankur Goenka [goe...@google.com]
Sent: Monday, June 03, 2019 11:01 AM
To: user@beam.apache.org
Subject: Re: How to build a beam python pipeline which does GET/POST request to 
API's

Thanks for providing more information.

Some follow up questions/comments
1. Call an API which would provide a dictionary as response.
Question: Do you need to make multiple of these API calls? If yes, what 
distinguishes API call1 from call2? If its the input to the API, then can you 
provide the inputs to in a file etc? What I am trying to identify is an input 
source to the pipeline so that beam can distribute the work.
Answer : When an API call is made, it can provide a list of dictionaries as 
response, we have to go through every dictionary, do the same transformations 
for each and send it.
2. Transform dictionary to add / remove few keys.
3. Send transformed dictionary as JSON to an API which prints this JSON as 
output.
Question: Are these write operation idempotent? As you are doing your own api 
calls, its possible that after a failure, the calls are done again for the same 
input. If write calls are not idempotent then their can be duplicate data.
Answer : Suppose, if I receive a list of 1000 dictionaries as response when I 
called API in point1, I should do only 1000 write operations respectively to 
each input. If there is a failure for any input, only that should not be posted 
and remaining should be posted successfully.

On Sat, Jun 1, 2019 at 8:13 PM Anjana Pydi 
mailto:anjan...@bahwancybertek.com>> wrote:
Hi Ankur,

Thanks for the reply! Below is more details of the usecase:

1. Call an API which would provide a dictionary as response.
2. Transform dictionary to add / remove few keys.
3. Send transformed dictionary as JSON to an API which prints this JSON as 
output.

Please let me know in case of any clarifications.

Thanks,
Anjana

From: Ankur Goenka [goe...@google.com]
Sent: Saturday, June 01, 2019 6:47 PM
To: user@beam.apache.org
Subject: Re: How to build a beam python pipeline which does GET/POST request to 
API's

Hi Anjana,

You can write your API logic in a ParDo and subsequently pass the elements to 
other ParDos to transform and eventually make an API call to to another 
endpoint.

However, this might not be a good fit for Beam as the input is not well defined 
and hence scaling and "once processing" of elements will not be possible as 
their is no well defined input.

It will be better to elaborate a bit more on the usecase for better suggestions.

Thanks,
Ankur

On Sat, Jun 1, 2019 at 5:50 PM Anjana Pydi 
mailto:anjan...@bahwancybertek.com>> wrote:
Hi,

I have a requirement to create an apache beam python pipeline to read a JSON 
from an API endpoint, transform it (add/remove few fields)and send the 
transformed JSON to another API endpoint.

Can anyone please provide some suggestions on how to do it.

Thanks,
Anjana
---
 The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you are not the 
intended recipient, please notify us immediately by responding to this email 
and then delete it from your system. Bahwan Cybertek is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.
---
 The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you are not the 
intended recipient, please notify us immediately by responding to this email 
and then delete it from your system. Bahwan Cybertek is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.
---
 The information contained in this commu

Re: How to build a beam python pipeline which does GET/POST request to API's

2019-06-03 Thread Ankur Goenka
Thanks for providing more information.

Some follow up questions/comments
1. Call an API which would provide a dictionary as response.
Question: Do you need to make multiple of these API calls? If yes, what
distinguishes API call1 from call2? If its the input to the API, then can
you provide the inputs to in a file etc? What I am trying to identify is an
input source to the pipeline so that beam can distribute the work.
2. Transform dictionary to add / remove few keys.
3. Send transformed dictionary as JSON to an API which prints this JSON as
output.
Question: Are these write operation idempotent? As you are doing your own
api calls, its possible that after a failure, the calls are done again for
the same input. If write calls are not idempotent then their can be
duplicate data.

On Sat, Jun 1, 2019 at 8:13 PM Anjana Pydi 
wrote:

> Hi Ankur,
>
> Thanks for the reply! Below is more details of the usecase:
>
> 1. Call an API which would provide a dictionary as response.
> 2. Transform dictionary to add / remove few keys.
> 3. Send transformed dictionary as JSON to an API which prints this JSON as
> output.
>
> Please let me know in case of any clarifications.
>
> Thanks,
> Anjana
> --
> *From:* Ankur Goenka [goe...@google.com]
> *Sent:* Saturday, June 01, 2019 6:47 PM
> *To:* user@beam.apache.org
> *Subject:* Re: How to build a beam python pipeline which does GET/POST
> request to API's
>
> Hi Anjana,
>
> You can write your API logic in a ParDo and subsequently pass the elements
> to other ParDos to transform and eventually make an API call to to another
> endpoint.
>
> However, this might not be a good fit for Beam as the input is not well
> defined and hence scaling and "once processing" of elements will not be
> possible as their is no well defined input.
>
> It will be better to elaborate a bit more on the usecase for better
> suggestions.
>
> Thanks,
> Ankur
>
> On Sat, Jun 1, 2019 at 5:50 PM Anjana Pydi 
> wrote:
>
>> Hi,
>>
>> I have a requirement to create an apache beam python pipeline to read a
>> JSON from an API endpoint, transform it (add/remove few fields)and send the
>> transformed JSON to another API endpoint.
>>
>> Can anyone please provide some suggestions on how to do it.
>>
>> Thanks,
>> Anjana
>> ---
>> The information contained in this communication is intended solely for the
>> use of the individual or entity to whom it is addressed and others
>> authorized to receive it. It may contain confidential or legally privileged
>> information. If you are not the intended recipient you are hereby notified
>> that any disclosure, copying, distribution or taking any action in reliance
>> on the contents of this information is strictly prohibited and may be
>> unlawful. If you are not the intended recipient, please notify us
>> immediately by responding to this email and then delete it from your
>> system. Bahwan Cybertek is neither liable for the proper and complete
>> transmission of the information contained in this communication nor for any
>> delay in its receipt.
>>
> ---
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
>


Re: Writing and serializing a custom WindowFn

2019-06-03 Thread Chad Dombrova
Hi Robert,
Thanks for the response.

As you've discovered, fully custom merging window fns are not yet
> supported portably, though this is on our todo list.
>
> https://issues.apache.org/jira/browse/BEAM-5601
>

Thanks for linking me to that.  I've watched it and voted for it, and maybe
I'll even take a peak at what it would take to implement if it appears to
be the best way forward for us.

Note that it's tricky to get the exact behavior you need as data may
> come in out of order. Consider, for example, three events of
> increasing timestamp e1, e2, e3 where e2 is the "end" event. It could
> happen that e1 and e3 get merged before e2 is seen, and there's no
> "unmerge" capability (the values may already be combined via an
> upstream combiner). How do you handle this?
>

This is something I've been wondering about myself.  I read the excellent
book Streaming Systems and it seems that the preferred way to solve this is
using event timestamps and watermarks, but that raised a few questions for
me.

First, just to clarify, you're correct that your example scenario could
occur _in processing time_ but our system can guarantee that it does not
happen in event time. i.e. The e2 "end" event will always have an event
timestamp later than e1 and e3.

So, can I use event time with watermarks to solve this problem?

IIUC, python does not have support for late arriving data, so that seems
like a pretty big issue.  If it did, would that be the preferred way of
solving this problem, or is that not enough?  If late data is indeed not
currently supported, then the critique of my custom WindowFn would apply to
Sessions in general would it not?

Is handling of late data in python something that's slated for an upcoming
release?

In the meantime, you could try getting this behavior with StatefulDoFns.
>

Is this fully supported by python now?  I've read some conflicting
information on the subject.

thanks again for the feedback!

(btw, I'm still experimenting with the static type checking issue:
https://github.com/python/mypy/issues/6933)

-chad


Re: SQL massively more resource-intensive? Memory leak?

2019-06-03 Thread Rui Wang
Ha sorry I was only reading screenshots but ignored your other comments. So
count fn indeed worked.

Can I ask if your sql pipeline works on direct runner?


-Rui

On Mon, Jun 3, 2019 at 10:39 AM Rui Wang  wrote:

> BeamSQL actually only converts SELECT COUNT(*) query to the Java pipeline
> that calls Java's builtin Count[1] transform.
>
> Could you implement your pipeline by Count transform to see whether this
> memory issue still exists? By doing so we could narrow down problem a bit.
> If using Java directly without going through SQL code path and it works, we
> will know that BeamSQL does not generate a working pipeline for yoru SELECT
> COUNT(*) query.
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54
>
> On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com 
> wrote:
>
>> Is using Beam SQL just massively more resource-intensive or is there a
>> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner)
>>
>> Here is my code:
>> https://pastebin.com/nNtc9ZaG
>>
>> Here is the error I get (truncated at the end because it's so long and
>> seemingly repetitive) when I run the SQL transform and my memory/CPU usage
>> skyrockets:
>> https://pastebin.com/mywmkCQi
>>
>> For example,
>>
>> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM,
>> everything working fine:
>>
>> `rowStream.apply(Combine.globally(Count.combineFn()).withoutDefaults()).apply(myPrint());`
>>
>> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+
>> RAM, soon crashes:
>> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
>> PCOLLECTION")).apply(myPrint());`
>>
>> I can't imagine this is expected behavior but maybe I'm just ignorant of
>> how SQL is implemented.
>>
>> Most of this code is just getting Schema Registry Avro Kafka messages
>> into a Row stream. There have been discussions on the mailing list recently
>> about how to do that. This is the best I could do. If that part is
>> incorrect I'd be glad to know.
>>
>> Any help appreciated. Thank you.
>>
>>


Re: SQL massively more resource-intensive? Memory leak?

2019-06-03 Thread Rui Wang
BeamSQL actually only converts SELECT COUNT(*) query to the Java pipeline
that calls Java's builtin Count[1] transform.

Could you implement your pipeline by Count transform to see whether this
memory issue still exists? By doing so we could narrow down problem a bit.
If using Java directly without going through SQL code path and it works, we
will know that BeamSQL does not generate a working pipeline for yoru SELECT
COUNT(*) query.


[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54

On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com  wrote:

> Is using Beam SQL just massively more resource-intensive or is there a bug
> somewhere here (in my code or elsewhere)? (I'm using Flink runner)
>
> Here is my code:
> https://pastebin.com/nNtc9ZaG
>
> Here is the error I get (truncated at the end because it's so long and
> seemingly repetitive) when I run the SQL transform and my memory/CPU usage
> skyrockets:
> https://pastebin.com/mywmkCQi
>
> For example,
>
> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM, everything
> working fine:
>
> `rowStream.apply(Combine.globally(Count.combineFn()).withoutDefaults()).apply(myPrint());`
>
> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+ RAM,
> soon crashes:
> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
> PCOLLECTION")).apply(myPrint());`
>
> I can't imagine this is expected behavior but maybe I'm just ignorant of
> how SQL is implemented.
>
> Most of this code is just getting Schema Registry Avro Kafka messages into
> a Row stream. There have been discussions on the mailing list recently
> about how to do that. This is the best I could do. If that part is
> incorrect I'd be glad to know.
>
> Any help appreciated. Thank you.
>
>


Re: Windows were processed out of order

2019-06-03 Thread Robert Bradshaw
Yep, that's correct.

On Mon, Jun 3, 2019 at 2:06 PM Juan Carlos Garcia  wrote:
>
> Hi Robert,
>
> The elements of a PCollection are unordered. >> Yes this is something known 
> and understood given the nature of a PCollection.
>
> So that means, that when we are doing a replay of past data (we rewind our 
> kafka consumer groups), in 1h of processing time, there might be multiple 1h 
> windows for a given GBK hence theses windows are fired on any arbitrary order?
>
> Thanks for the insight!
> JC
>
> On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw  wrote:
>>
>> The elements of a PCollection are unordered. This includes the results
>> of a GBK--there is no promise that the output be processed in any (in
>> particular, windows ordered by timestamp) order. DoFns, especially one
>> with side effects, should be written with this in mind.
>>
>> (There is actually ongoing discussion on the dev list about how to
>> make it easier to write order-sensitive operations.)
>>
>> On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia  
>> wrote:
>> >
>> > Hi Folks,
>> >
>> > My team and i have a situation that cannot be explain and
>> > would like to hear your thoughts, we have a pipeline which
>> > enrich the incoming messages and write them to BigQuery, the pipeline 
>> > looks like this:
>> >
>> > Apache Beam 2.12.0 / GCP Dataflow
>> >
>> > -
>> > - ReadFromKafka (with withCreateTime and 10min MaxDelay)
>> > - ApplySomeInitialEnrichment (just add some stuff to the incoming json 
>> > messages)
>> > - Apply a Fixed 1 hour window (with default triggering)
>> > - Apply Group By Key (userId)
>> > - Apply External Sorter 
>> > (SortValues.create(BufferedExternalSorter.options(
>> > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
>> >   - Read initial state from Hbase (BigTable)
>> >   - loop thru all messages, enriching them with the previous state 
>> > (incremental enrichment) and session calculation
>> >   - write the final state to Hbase (BigTable)
>> >   - output each of the enriched element to the next DoFn
>> > - Apply a Transformation to prepare the data to BigQuery
>> > - Apply BigQueryIO
>> > --
>> >
>> >
>> > Just to give some more context we have a meta_info column in our BigQuery 
>> > table which values are set at the very beginning of the 
>> > ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> 
>> > will hold the same information. The meta_info column contains the 
>> > serialized PaneInfo, WindowInfo and our SystemTimestamp = 
>> > currentTimeMilliseconds.
>> >
>> > We have 3 windows:
>> >   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), 
>> > systemTimestamp: 1559396670577
>> >   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), 
>> > systemTimestamp: 1559396670670
>> >   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), 
>> > systemTimestamp: 1559396670533
>> >
>> >
>> > window A contains: 18 records
>> > window B contains: 46 records
>> > window C contains: 3  records
>> >
>> > If you pay attention to the A, B, C windowInfo from above, the 
>> > `systemTimestamp` field reflect an incorrect order of processing, and the 
>> > enrichment was executed as C -> A ->  B, corrupting all the messages for 
>> > this given user.
>> >
>> > For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
>> > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
>> > onTimeIndex=0}"
>> >
>> > Any idea why would the windows be triggered out of order?
>> >
>> > --
>> >
>> > JC
>> >
>
>
>
> --
>
> JC
>


Re: Windows were processed out of order

2019-06-03 Thread Juan Carlos Garcia
Hi Robert,

*The elements of a PCollection are unordered.* >> Yes this is something
known and understood given the nature of a PCollection.

So that means, that when we are doing a replay of past data (we rewind our
kafka consumer groups), in 1h of processing time, there might be multiple
1h windows for a given GBK hence theses windows are fired on any arbitrary
order?

Thanks for the insight!
JC

On Mon, Jun 3, 2019 at 1:50 PM Robert Bradshaw  wrote:

> The elements of a PCollection are unordered. This includes the results
> of a GBK--there is no promise that the output be processed in any (in
> particular, windows ordered by timestamp) order. DoFns, especially one
> with side effects, should be written with this in mind.
>
> (There is actually ongoing discussion on the dev list about how to
> make it easier to write order-sensitive operations.)
>
> On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia 
> wrote:
> >
> > Hi Folks,
> >
> > My team and i have a situation that cannot be explain and
> > would like to hear your thoughts, we have a pipeline which
> > enrich the incoming messages and write them to BigQuery, the pipeline
> looks like this:
> >
> > Apache Beam 2.12.0 / GCP Dataflow
> >
> > -
> > - ReadFromKafka (with withCreateTime and 10min MaxDelay)
> > - ApplySomeInitialEnrichment (just add some stuff to the incoming json
> messages)
> > - Apply a Fixed 1 hour window (with default triggering)
> > - Apply Group By Key (userId)
> > - Apply External Sorter
> (SortValues.create(BufferedExternalSorter.options(
> > - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
> >   - Read initial state from Hbase (BigTable)
> >   - loop thru all messages, enriching them with the previous state
> (incremental enrichment) and session calculation
> >   - write the final state to Hbase (BigTable)
> >   - output each of the enriched element to the next DoFn
> > - Apply a Transformation to prepare the data to BigQuery
> > - Apply BigQueryIO
> > --
> >
> >
> > Just to give some more context we have a meta_info column in our
> BigQuery table which values are set at the very beginning of the
> ComplexEnrichmentDoFn, meaning all the records within the same Iterable<>
> will hold the same information. The meta_info column contains the
> serialized PaneInfo, WindowInfo and our SystemTimestamp =
> currentTimeMilliseconds.
> >
> > We have 3 windows:
> >   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z),
> systemTimestamp: 1559396670577
> >   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z),
> systemTimestamp: 1559396670670
> >   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z),
> systemTimestamp: 1559396670533
> >
> >
> > window A contains: 18 records
> > window B contains: 46 records
> > window C contains: 3  records
> >
> > If you pay attention to the A, B, C windowInfo from above, the
> `systemTimestamp` field reflect an incorrect order of processing, and the
> enrichment was executed as C -> A ->  B, corrupting all the messages for
> this given user.
> >
> > For all 3 windows the serialized PaneInfo was set by the runner to
> ON_TIME:
> > A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
> onTimeIndex=0}"
> >
> > Any idea why would the windows be triggered out of order?
> >
> > --
> >
> > JC
> >
>


-- 

JC


Re: Windows were processed out of order

2019-06-03 Thread Robert Bradshaw
The elements of a PCollection are unordered. This includes the results
of a GBK--there is no promise that the output be processed in any (in
particular, windows ordered by timestamp) order. DoFns, especially one
with side effects, should be written with this in mind.

(There is actually ongoing discussion on the dev list about how to
make it easier to write order-sensitive operations.)

On Mon, Jun 3, 2019 at 1:17 PM Juan Carlos Garcia  wrote:
>
> Hi Folks,
>
> My team and i have a situation that cannot be explain and
> would like to hear your thoughts, we have a pipeline which
> enrich the incoming messages and write them to BigQuery, the pipeline looks 
> like this:
>
> Apache Beam 2.12.0 / GCP Dataflow
>
> -
> - ReadFromKafka (with withCreateTime and 10min MaxDelay)
> - ApplySomeInitialEnrichment (just add some stuff to the incoming json 
> messages)
> - Apply a Fixed 1 hour window (with default triggering)
> - Apply Group By Key (userId)
> - Apply External Sorter (SortValues.create(BufferedExternalSorter.options(
> - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
>   - Read initial state from Hbase (BigTable)
>   - loop thru all messages, enriching them with the previous state 
> (incremental enrichment) and session calculation
>   - write the final state to Hbase (BigTable)
>   - output each of the enriched element to the next DoFn
> - Apply a Transformation to prepare the data to BigQuery
> - Apply BigQueryIO
> --
>
>
> Just to give some more context we have a meta_info column in our BigQuery 
> table which values are set at the very beginning of the 
> ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> 
> will hold the same information. The meta_info column contains the serialized 
> PaneInfo, WindowInfo and our SystemTimestamp = currentTimeMilliseconds.
>
> We have 3 windows:
>   A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), 
> systemTimestamp: 1559396670577
>   B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), 
> systemTimestamp: 1559396670670
>   C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), 
> systemTimestamp: 1559396670533
>
>
> window A contains: 18 records
> window B contains: 46 records
> window C contains: 3  records
>
> If you pay attention to the A, B, C windowInfo from above, the 
> `systemTimestamp` field reflect an incorrect order of processing, and the 
> enrichment was executed as C -> A ->  B, corrupting all the messages for this 
> given user.
>
> For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
> A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
> onTimeIndex=0}"
>
> Any idea why would the windows be triggered out of order?
>
> --
>
> JC
>


Windows were processed out of order

2019-06-03 Thread Juan Carlos Garcia
Hi Folks,

My team and i have a situation that cannot be explain and
would like to hear your thoughts, we have a pipeline which
enrich the incoming messages and write them to BigQuery, the pipeline looks
like this:

Apache Beam 2.12.0 / GCP Dataflow

-
- ReadFromKafka (with withCreateTime and 10min MaxDelay)
- ApplySomeInitialEnrichment (just add some stuff to the incoming json
messages)
- Apply a Fixed 1 hour window (with default triggering)
- Apply Group By Key (userId)
- Apply External Sorter
(SortValues.create(BufferedExternalSorter.options(
- Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
  - Read initial state from Hbase (BigTable)
  - loop thru all messages, enriching them with the previous state
(incremental enrichment) and session calculation
  - write the final state to Hbase (BigTable)
  - output each of the enriched element to the next DoFn
- Apply a Transformation to prepare the data to BigQuery
- Apply BigQueryIO
--


Just to give some more context we have a meta_info column in our BigQuery
table which values are set at the very beginning of the
ComplexEnrichmentDoFn, meaning all the records within the same Iterable<>
will hold the same information. The meta_info column contains the
serialized PaneInfo, WindowInfo and our SystemTimestamp =
currentTimeMilliseconds.

We have 3 windows:
  A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z),
systemTimestamp: *1559396670577*
  B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z),
systemTimestamp: *1559396670670*
  C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z),
systemTimestamp: *1559396670533*


window A contains: 18 records
window B contains: 46 records
window C contains: 3  records

If you pay attention to the A, B, C windowInfo from above, the `
*systemTimestamp*` field reflect an incorrect order of processing, and the
enrichment was executed as C -> A ->  B, corrupting all the messages for
this given user.

For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
onTimeIndex=0}"

Any idea why would the windows be triggered out of order?

-- 

JC


Re: Writing and serializing a custom WindowFn

2019-06-03 Thread Robert Bradshaw
Hi Chad!

As you've discovered, fully custom merging window fns are not yet
supported portably, though this is on our todo list.

https://issues.apache.org/jira/browse/BEAM-5601

This involves calling back into the SDK to perform the actually
merging logic (and also, for full generality, being able to extract
the endpoints of unknown window types).

Note that it's tricky to get the exact behavior you need as data may
come in out of order. Consider, for example, three events of
increasing timestamp e1, e2, e3 where e2 is the "end" event. It could
happen that e1 and e3 get merged before e2 is seen, and there's no
"unmerge" capability (the values may already be combined via an
upstream combiner). How do you handle this?

In the meantime, you could try getting this behavior with StatefulDoFns.


On Sun, Jun 2, 2019 at 6:33 AM Chad Dombrova  wrote:
>
> Hi all,
> I'm currently investigating Beam for our company's needs, and I have a 
> question about how to solve a specific windowing problem in python.
>
> I have a stream of elements and I want to gather them until a special 
> end-of-stream element arrives.
>
> To solve this, I've written a modified version of window.Sessions that takes 
> an optional callable which is passed an element when it is being assigned to 
> a window.  If the callable returns True, the window is immediately closed.  
> Here's the code:
>
> https://gist.github.com/chadrik/b0dfff8953fed99f99bdd69c7cc870ba
>
> It works as expected in the Direct runner, but fails in Dataflow, which I'm 
> pretty sure is due to a serialization problem.
>
> So, I have a few questions:
>
> 1) Is there a way to do this in python using stock components? (i.e. without 
> my custom class)
>
> 2) If not, is there any interest in accepting a PR to modify the stock 
> window.Sessions to do something like what I've written?  It seems very 
> useful, and I've seen other people attempting to solve this same problem 
> [1][2]
>
> 3) If not, how do I go about serializing my custom WindowFn class?  From 
> looking at the source code I'm certain I could figure out how to extend the 
> serialization for a stock object like window.Sessions (modify 
> standard_window_fns.proto, and update to/from_runner_api_parameter), but it's 
> very unclear how I would do this for a custom WindowFn, since serialization 
> of these classes seems to be part of the official beam gRPC portability 
> protocol.
>
> Thanks in advance for your help!
> -chad
>
>
> 1. 
> https://stackoverflow.com/questions/49011360/using-a-custom-windowfn-on-google-dataflow-which-closes-after-an-element-value
> 2. 
> https://stackoverflow.com/questions/43035302/close-window-after-based-on-element-value/43052779#43052779
>
>
>
>
>
>