Re: Triggers based on size

2018-01-10 Thread Carlos Alonso
Hi Robert, Kenneth.

Thanks a lot to both of you for your responses!!

Kenneth, unfortunately I'm not sure we're experienced enough with Apache
Beam to get anywhere close to your suggestion, but thanks anyway!!

Robert, your suggestion sounds great to me, could you please provide any
example on how to use that 'metadata driven' trigger?

Thanks!

On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles  wrote:

> Often, when you need or want more control than triggers provide, such as
> input-type-specific logic like yours, you can use state and timers in ParDo
> to control when to output. You lose any potential optimizations of Combine
> based on associativity/commutativity and assume the burden of making sure
> your output is sensible, but dropping to low-level stateful computation may
> be your best bet.
>
> Kenn
>
> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw 
> wrote:
>
>> We've tossed around the idea of "metadata-driven" triggers which would
>> essentially let you provide a mapping element -> metadata and a
>> monotonic CombineFn metadata* -> bool that would allow for this (the
>> AfterCount being a special case of this, with the mapping fn being _
>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>> provide a (perhaps approximate) sizing mapping fn).
>>
>> Note, however, that there's no guarantee that the trigger fire as soon
>> as possible; due to runtime characteristics a significant amount of
>> data may be buffered (or come in at once) before the trigger is
>> queried. One possibility would be to follow your triggering with a
>> DoFn that breaks up large value streams into multiple manageable sized
>> ones as needed.
>>
>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso 
>> wrote:
>> > Hi everyone!!
>> >
>> > I was wondering if there is an option to trigger window panes based on
>> the
>> > size of the pane itself (rather than the number of elements).
>> >
>> > To provide a little bit more of context we're backing up a PubSub topic
>> into
>> > GCS with the "special" feature that, depending on the "type" of the
>> message,
>> > the GCS destination is one or another.
>> >
>> > Messages' 'shape' published there is quite random, some of them are very
>> > frequent and small, some others very big but sparse... We have around
>> 150
>> > messages per second (in total) and we're firing every 15 minutes and
>> > experiencing OOM errors, we've considered firing based on the number of
>> > items as well, but given the randomness of the input, I don't think it
>> will
>> > be a final solution either.
>> >
>> > Having a trigger based on size would be great, another option would be
>> to
>> > have a dynamic shards number for the PTransform that actually writes the
>> > files.
>> >
>> > What is your recommendation for this use case?
>> >
>> > Thanks!!
>>
>
>


Re: Triggers based on size

2018-01-10 Thread Robert Bradshaw
Unfortunately, the metadata driven trigger is still just an idea, not
yet implemented.

A good introduction to state and timers can be found at
https://beam.apache.org/blog/2017/08/28/timely-processing.html

On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso  wrote:
> Hi Robert, Kenneth.
>
> Thanks a lot to both of you for your responses!!
>
> Kenneth, unfortunately I'm not sure we're experienced enough with Apache
> Beam to get anywhere close to your suggestion, but thanks anyway!!
>
> Robert, your suggestion sounds great to me, could you please provide any
> example on how to use that 'metadata driven' trigger?
>
> Thanks!
>
> On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles  wrote:
>>
>> Often, when you need or want more control than triggers provide, such as
>> input-type-specific logic like yours, you can use state and timers in ParDo
>> to control when to output. You lose any potential optimizations of Combine
>> based on associativity/commutativity and assume the burden of making sure
>> your output is sensible, but dropping to low-level stateful computation may
>> be your best bet.
>>
>> Kenn
>>
>> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw 
>> wrote:
>>>
>>> We've tossed around the idea of "metadata-driven" triggers which would
>>> essentially let you provide a mapping element -> metadata and a
>>> monotonic CombineFn metadata* -> bool that would allow for this (the
>>> AfterCount being a special case of this, with the mapping fn being _
>>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>>> provide a (perhaps approximate) sizing mapping fn).
>>>
>>> Note, however, that there's no guarantee that the trigger fire as soon
>>> as possible; due to runtime characteristics a significant amount of
>>> data may be buffered (or come in at once) before the trigger is
>>> queried. One possibility would be to follow your triggering with a
>>> DoFn that breaks up large value streams into multiple manageable sized
>>> ones as needed.
>>>
>>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso 
>>> wrote:
>>> > Hi everyone!!
>>> >
>>> > I was wondering if there is an option to trigger window panes based on
>>> > the
>>> > size of the pane itself (rather than the number of elements).
>>> >
>>> > To provide a little bit more of context we're backing up a PubSub topic
>>> > into
>>> > GCS with the "special" feature that, depending on the "type" of the
>>> > message,
>>> > the GCS destination is one or another.
>>> >
>>> > Messages' 'shape' published there is quite random, some of them are
>>> > very
>>> > frequent and small, some others very big but sparse... We have around
>>> > 150
>>> > messages per second (in total) and we're firing every 15 minutes and
>>> > experiencing OOM errors, we've considered firing based on the number of
>>> > items as well, but given the randomness of the input, I don't think it
>>> > will
>>> > be a final solution either.
>>> >
>>> > Having a trigger based on size would be great, another option would be
>>> > to
>>> > have a dynamic shards number for the PTransform that actually writes
>>> > the
>>> > files.
>>> >
>>> > What is your recommendation for this use case?
>>> >
>>> > Thanks!!
>>
>>
>


Re: Triggers based on size

2018-01-10 Thread Carlos Alonso
Thanks Robert!!

After reading this and the former post about stateful processing Kenneth's
suggestions sounds sensible. I'll probably give them a try!! Is there
anything you would like to advice me before starting?

Thanks!

On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw 
wrote:

> Unfortunately, the metadata driven trigger is still just an idea, not
> yet implemented.
>
> A good introduction to state and timers can be found at
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso 
> wrote:
> > Hi Robert, Kenneth.
> >
> > Thanks a lot to both of you for your responses!!
> >
> > Kenneth, unfortunately I'm not sure we're experienced enough with Apache
> > Beam to get anywhere close to your suggestion, but thanks anyway!!
> >
> > Robert, your suggestion sounds great to me, could you please provide any
> > example on how to use that 'metadata driven' trigger?
> >
> > Thanks!
> >
> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles  wrote:
> >>
> >> Often, when you need or want more control than triggers provide, such as
> >> input-type-specific logic like yours, you can use state and timers in
> ParDo
> >> to control when to output. You lose any potential optimizations of
> Combine
> >> based on associativity/commutativity and assume the burden of making
> sure
> >> your output is sensible, but dropping to low-level stateful computation
> may
> >> be your best bet.
> >>
> >> Kenn
> >>
> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw 
> >> wrote:
> >>>
> >>> We've tossed around the idea of "metadata-driven" triggers which would
> >>> essentially let you provide a mapping element -> metadata and a
> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
> >>> AfterCount being a special case of this, with the mapping fn being _
> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
> >>> provide a (perhaps approximate) sizing mapping fn).
> >>>
> >>> Note, however, that there's no guarantee that the trigger fire as soon
> >>> as possible; due to runtime characteristics a significant amount of
> >>> data may be buffered (or come in at once) before the trigger is
> >>> queried. One possibility would be to follow your triggering with a
> >>> DoFn that breaks up large value streams into multiple manageable sized
> >>> ones as needed.
> >>>
> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso 
> >>> wrote:
> >>> > Hi everyone!!
> >>> >
> >>> > I was wondering if there is an option to trigger window panes based
> on
> >>> > the
> >>> > size of the pane itself (rather than the number of elements).
> >>> >
> >>> > To provide a little bit more of context we're backing up a PubSub
> topic
> >>> > into
> >>> > GCS with the "special" feature that, depending on the "type" of the
> >>> > message,
> >>> > the GCS destination is one or another.
> >>> >
> >>> > Messages' 'shape' published there is quite random, some of them are
> >>> > very
> >>> > frequent and small, some others very big but sparse... We have around
> >>> > 150
> >>> > messages per second (in total) and we're firing every 15 minutes and
> >>> > experiencing OOM errors, we've considered firing based on the number
> of
> >>> > items as well, but given the randomness of the input, I don't think
> it
> >>> > will
> >>> > be a final solution either.
> >>> >
> >>> > Having a trigger based on size would be great, another option would
> be
> >>> > to
> >>> > have a dynamic shards number for the PTransform that actually writes
> >>> > the
> >>> > files.
> >>> >
> >>> > What is your recommendation for this use case?
> >>> >
> >>> > Thanks!!
> >>
> >>
> >
>


Re: Triggers based on size

2018-01-10 Thread Robert Bradshaw
Sounds like you have enough to get started. Feel free to come back
here with more specifics if you can't get it working.

On Wed, Jan 10, 2018 at 9:09 AM, Carlos Alonso  wrote:
> Thanks Robert!!
>
> After reading this and the former post about stateful processing Kenneth's
> suggestions sounds sensible. I'll probably give them a try!! Is there
> anything you would like to advice me before starting?
>
> Thanks!
>
> On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw 
> wrote:
>>
>> Unfortunately, the metadata driven trigger is still just an idea, not
>> yet implemented.
>>
>> A good introduction to state and timers can be found at
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso 
>> wrote:
>> > Hi Robert, Kenneth.
>> >
>> > Thanks a lot to both of you for your responses!!
>> >
>> > Kenneth, unfortunately I'm not sure we're experienced enough with Apache
>> > Beam to get anywhere close to your suggestion, but thanks anyway!!
>> >
>> > Robert, your suggestion sounds great to me, could you please provide any
>> > example on how to use that 'metadata driven' trigger?
>> >
>> > Thanks!
>> >
>> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles  wrote:
>> >>
>> >> Often, when you need or want more control than triggers provide, such
>> >> as
>> >> input-type-specific logic like yours, you can use state and timers in
>> >> ParDo
>> >> to control when to output. You lose any potential optimizations of
>> >> Combine
>> >> based on associativity/commutativity and assume the burden of making
>> >> sure
>> >> your output is sensible, but dropping to low-level stateful computation
>> >> may
>> >> be your best bet.
>> >>
>> >> Kenn
>> >>
>> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw 
>> >> wrote:
>> >>>
>> >>> We've tossed around the idea of "metadata-driven" triggers which would
>> >>> essentially let you provide a mapping element -> metadata and a
>> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
>> >>> AfterCount being a special case of this, with the mapping fn being _
>> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>> >>> provide a (perhaps approximate) sizing mapping fn).
>> >>>
>> >>> Note, however, that there's no guarantee that the trigger fire as soon
>> >>> as possible; due to runtime characteristics a significant amount of
>> >>> data may be buffered (or come in at once) before the trigger is
>> >>> queried. One possibility would be to follow your triggering with a
>> >>> DoFn that breaks up large value streams into multiple manageable sized
>> >>> ones as needed.
>> >>>
>> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso 
>> >>> wrote:
>> >>> > Hi everyone!!
>> >>> >
>> >>> > I was wondering if there is an option to trigger window panes based
>> >>> > on
>> >>> > the
>> >>> > size of the pane itself (rather than the number of elements).
>> >>> >
>> >>> > To provide a little bit more of context we're backing up a PubSub
>> >>> > topic
>> >>> > into
>> >>> > GCS with the "special" feature that, depending on the "type" of the
>> >>> > message,
>> >>> > the GCS destination is one or another.
>> >>> >
>> >>> > Messages' 'shape' published there is quite random, some of them are
>> >>> > very
>> >>> > frequent and small, some others very big but sparse... We have
>> >>> > around
>> >>> > 150
>> >>> > messages per second (in total) and we're firing every 15 minutes and
>> >>> > experiencing OOM errors, we've considered firing based on the number
>> >>> > of
>> >>> > items as well, but given the randomness of the input, I don't think
>> >>> > it
>> >>> > will
>> >>> > be a final solution either.
>> >>> >
>> >>> > Having a trigger based on size would be great, another option would
>> >>> > be
>> >>> > to
>> >>> > have a dynamic shards number for the PTransform that actually writes
>> >>> > the
>> >>> > files.
>> >>> >
>> >>> > What is your recommendation for this use case?
>> >>> >
>> >>> > Thanks!!
>> >>
>> >>
>> >


Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

2018-01-10 Thread Raghu Angadi
How often does your pipeline checkpoint/snapshot? If the failure happens
before the first checkpoint, the pipeline could restart without any state,
in which case KafkaIO would read from latest offset. There is probably some
way to verify if pipeline is restarting from a checkpoint.

On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks  wrote:

> HI Aljoscha,
>The issue is let's say I consumed 100 elements in 5
> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
> those elements. If there is an issue while processing element 70 in
> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
> the rest 30 elements. Wanted to know if this is expected? In case if you
> still having doubt let me know will share a code snippet.
>
> Regards,
> Sushil Ks
>


Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

2018-01-10 Thread Mingmin Xu
@Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
experience can help you a bit.

For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
need to leverage exactly-once checkpoint/savepoint in Flink. The reason
is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
read, and once job is restarted KafkaIO reads from last_committed_offset.

In my jobs, I enable external(external should be optional I think?)
checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
on failures it doesn't lost data. In case of manually redeploy the job, I
use savepoint to cancel and launch the job.

Mingmin

On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi  wrote:

> How often does your pipeline checkpoint/snapshot? If the failure happens
> before the first checkpoint, the pipeline could restart without any state,
> in which case KafkaIO would read from latest offset. There is probably some
> way to verify if pipeline is restarting from a checkpoint.
>
> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks  wrote:
>
>> HI Aljoscha,
>>The issue is let's say I consumed 100 elements in 5
>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
>> those elements. If there is an issue while processing element 70 in
>> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
>> the rest 30 elements. Wanted to know if this is expected? In case if you
>> still having doubt let me know will share a code snippet.
>>
>> Regards,
>> Sushil Ks
>>
>
>


-- 

Mingmin


Request payload size exceeds the limit: 10485760 bytes

2018-01-10 Thread Unais Thachuparambil
I wrote a python dataflow job to read data from biqquery and do some
transform and save the result as bq table..

I tested with 8 days data it works fine - when I scaled to 180 days I’m
getting the below error

```"message": "Request payload size exceeds the limit: 10485760 bytes.",```


```pitools.base.py.exceptions.HttpError: HttpError accessing <
https://dataflow.googleapis.com/v1b3/projects/careem-mktg-dwh/locations/us-central1/jobs?alt=json>:
response: <{'status': '400', 'content-length': '145', 'x-xss-protection':
'1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding':
'chunked', 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF',
'-content-encoding': 'gzip', 'cache-control': 'private', 'date': 'Wed, 10
Jan 2018 22:49:32 GMT', 'x-frame-options': 'SAMEORIGIN', 'alt-svc':
'hq=":443"; ma=2592000; quic=51303431; quic=51303339; quic=51303338;
quic=51303337; quic=51303335,quic=":443"; ma=2592000; v="41,39,38,37,35"',
'content-type': 'application/json; charset=UTF-8'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"status": "INVALID_ARGUMENT"
}

```


In short, this is what I’m doing
1 - Reading data from bigquery table using
```beam.io.BigQuerySource ```
2 - Partitioning each days using
``` beam.Partition ```
3- Applying transforms each partition and combining some output
P-Collections.
4- After the transforms, the results are saved to a biqquery date
partitioned table.


Re: Request payload size exceeds the limit: 10485760 bytes

2018-01-10 Thread Chamikara Jayalath
Dataflow service has a 10MB request size limit. Seems like you are hitting
this. See following for more information regarding this.
https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline

Looks like your are hitting this due to number of partitions. I don't think
currently there's a good solution other than to execute multiple jobs. We
hope to introduce dynamic destinations feature to Python BQ sink in the
near future which will allow you to write this using a more compact
pipeline.

Thanks,
Cham

On Wed, Jan 10, 2018 at 10:22 PM Unais Thachuparambil <
unais.thachuparam...@careem.com> wrote:

> I wrote a python dataflow job to read data from biqquery and do some
> transform and save the result as bq table..
>
> I tested with 8 days data it works fine - when I scaled to 180 days I’m
> getting the below error
>
> ```"message": "Request payload size exceeds the limit: 10485760 bytes.",```
>
>
> ```pitools.base.py.exceptions.HttpError: HttpError accessing <
> https://dataflow.googleapis.com/v1b3/projects/careem-mktg-dwh/locations/us-central1/jobs?alt=json>:
> response: <{'status': '400', 'content-length': '145', 'x-xss-protection':
> '1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding':
> 'chunked', 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF',
> '-content-encoding': 'gzip', 'cache-control': 'private', 'date': 'Wed, 10
> Jan 2018 22:49:32 GMT', 'x-frame-options': 'SAMEORIGIN', 'alt-svc':
> 'hq=":443"; ma=2592000; quic=51303431; quic=51303339; quic=51303338;
> quic=51303337; quic=51303335,quic=":443"; ma=2592000; v="41,39,38,37,35"',
> 'content-type': 'application/json; charset=UTF-8'}>, content <{
> "error": {
> "code": 400,
> "message": "Request payload size exceeds the limit: 10485760 bytes.",
> "status": "INVALID_ARGUMENT"
> }
>
> ```
>
>
> In short, this is what I’m doing
> 1 - Reading data from bigquery table using
> ```beam.io.BigQuerySource ```
> 2 - Partitioning each days using
> ``` beam.Partition ```
> 3- Applying transforms each partition and combining some output
> P-Collections.
> 4- After the transforms, the results are saved to a biqquery date
> partitioned table.
>


Re: Request payload size exceeds the limit: 10485760 bytes

2018-01-10 Thread Unais Thachuparambil
Is it because of my output I'm participating writing to 180 partitions? Or
because of more pipeline operations & transforms

On Thu, Jan 11, 2018 at 10:48 AM, Chamikara Jayalath 
wrote:

> Dataflow service has a 10MB request size limit. Seems like you are hitting
> this. See following for more information regarding this.
> https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline
>
> Looks like your are hitting this due to number of partitions. I don't
> think currently there's a good solution other than to execute multiple
> jobs. We hope to introduce dynamic destinations feature to Python BQ sink
> in the near future which will allow you to write this using a more compact
> pipeline.
>
> Thanks,
> Cham
>
>
> On Wed, Jan 10, 2018 at 10:22 PM Unais Thachuparambil <
> unais.thachuparam...@careem.com> wrote:
>
>> I wrote a python dataflow job to read data from biqquery and do some
>> transform and save the result as bq table..
>>
>> I tested with 8 days data it works fine - when I scaled to 180 days I’m
>> getting the below error
>>
>> ```"message": "Request payload size exceeds the limit: 10485760
>> bytes.",```
>>
>>
>> ```pitools.base.py.exceptions.HttpError: HttpError accessing <
>> https://dataflow.googleapis.com/v1b3/projects/careem-mktg-
>> dwh/locations/us-central1/jobs?alt=json>: response: <{'status': '400',
>> 'content-length': '145', 'x-xss-protection': '1; mode=block',
>> 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked',
>> 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF', '-content-encoding':
>> 'gzip', 'cache-control': 'private', 'date': 'Wed, 10 Jan 2018 22:49:32
>> GMT', 'x-frame-options': 'SAMEORIGIN', 'alt-svc': 'hq=":443"; ma=2592000;
>> quic=51303431; quic=51303339; quic=51303338; quic=51303337;
>> quic=51303335,quic=":443"; ma=2592000; v="41,39,38,37,35"', 'content-type':
>> 'application/json; charset=UTF-8'}>, content <{
>> "error": {
>> "code": 400,
>> "message": "Request payload size exceeds the limit: 10485760 bytes.",
>> "status": "INVALID_ARGUMENT"
>> }
>>
>> ```
>>
>>
>> In short, this is what I’m doing
>> 1 - Reading data from bigquery table using
>> ```beam.io.BigQuerySource ```
>> 2 - Partitioning each days using
>> ``` beam.Partition ```
>> 3- Applying transforms each partition and combining some output
>> P-Collections.
>> 4- After the transforms, the results are saved to a biqquery date
>> partitioned table.
>>
>


Re: Request payload size exceeds the limit: 10485760 bytes

2018-01-10 Thread Chamikara Jayalath
It's due to the size of the JSON serialized Dataflow pipeline (number of
transforms and serialized size of these transforms).

On Wed, Jan 10, 2018 at 11:40 PM Unais Thachuparambil <
unais.thachuparam...@careem.com> wrote:

> Is it because of my output I'm participating writing to 180 partitions? Or
> because of more pipeline operations & transforms
>
> On Thu, Jan 11, 2018 at 10:48 AM, Chamikara Jayalath  > wrote:
>
>> Dataflow service has a 10MB request size limit. Seems like you are
>> hitting this. See following for more information regarding this.
>> https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline
>>
>> Looks like your are hitting this due to number of partitions. I don't
>> think currently there's a good solution other than to execute multiple
>> jobs. We hope to introduce dynamic destinations feature to Python BQ sink
>> in the near future which will allow you to write this using a more compact
>> pipeline.
>>
>> Thanks,
>> Cham
>>
>>
>> On Wed, Jan 10, 2018 at 10:22 PM Unais Thachuparambil <
>> unais.thachuparam...@careem.com> wrote:
>>
>>> I wrote a python dataflow job to read data from biqquery and do some
>>> transform and save the result as bq table..
>>>
>>> I tested with 8 days data it works fine - when I scaled to 180 days I’m
>>> getting the below error
>>>
>>> ```"message": "Request payload size exceeds the limit: 10485760
>>> bytes.",```
>>>
>>>
>>> ```pitools.base.py.exceptions.HttpError: HttpError accessing <
>>> https://dataflow.googleapis.com/v1b3/projects/careem-mktg-dwh/locations/us-central1/jobs?alt=json>:
>>> response: <{'status': '400', 'content-length': '145', 'x-xss-protection':
>>> '1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding':
>>> 'chunked', 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF',
>>> '-content-encoding': 'gzip', 'cache-control': 'private', 'date': 'Wed, 10
>>> Jan 2018 22:49:32 GMT', 'x-frame-options': 'SAMEORIGIN', 'alt-svc':
>>> 'hq=":443"; ma=2592000; quic=51303431; quic=51303339; quic=51303338;
>>> quic=51303337; quic=51303335,quic=":443"; ma=2592000; v="41,39,38,37,35"',
>>> 'content-type': 'application/json; charset=UTF-8'}>, content <{
>>> "error": {
>>> "code": 400,
>>> "message": "Request payload size exceeds the limit: 10485760 bytes.",
>>> "status": "INVALID_ARGUMENT"
>>> }
>>>
>>> ```
>>>
>>>
>>> In short, this is what I’m doing
>>> 1 - Reading data from bigquery table using
>>> ```beam.io.BigQuerySource ```
>>> 2 - Partitioning each days using
>>> ``` beam.Partition ```
>>> 3- Applying transforms each partition and combining some output
>>> P-Collections.
>>> 4- After the transforms, the results are saved to a biqquery date
>>> partitioned table.
>>>
>>
>