RE: Distribute Parallelism/Tasks within RichOutputFormat?

2020-12-23 Thread Hailu, Andreas [Engineering]
Thanks Chesnay, Flavio – I believe Flavio’s first recommendation will work well 
enough. I agree that the second approach may be a bit finicky to use long-term.

Cheers.

// ah

From: Chesnay Schepler 
Sent: Wednesday, December 23, 2020 4:07 AM
To: Flavio Pompermaier ; Hailu, Andreas [Engineering] 

Cc: user@flink.apache.org
Subject: Re: Distribute Parallelism/Tasks within RichOutputFormat?

Essentially I see 2 options here:
a) split your output format such that each format is it's own sink, and then 
follow Flavio's suggestion to filter the stream and apply each sink to one of 
the streams, with the respective parallelism. This would be the recommended 
approach.
b) modify your (custom?) output format to only create one of the Hadoop output 
formats within open() based on the subtask index, and apply a custom 
partitioner onto the input datastream that routes the elements based on the 
conditions to the respective subtasks. I would not recommend this though, 
because it could be quite a headache maintenance-wise.

On 12/23/2020 9:53 AM, Flavio Pompermaier wrote:
I'm not an expert of the streaming APIs but you could try to do something like 
this:

DataStream ds = null;
DataStream ds1 = ds.filter(...).setParallelism(3);
DataStream ds2 = ds.filter(...).setParallelism(7);

Could it fit your needs?

Best,
Flavio

On Wed, Dec 23, 2020 at 3:54 AM Hailu, Andreas [Engineering] 
mailto:andreas.ha...@gs.com>> wrote:
Hi folks,

I’ve got a single RichOutputFormat which is comprised of two 
HadoopOutputFormats, let’s call them A and B, each writing to different HDFS 
directories. If a Record matches a certain condition it’s written using A, 
otherwise it’s written with B. Currently, the parallelism that is set at the 
RichOutputFormat seems to propagates to both A & B – meaning if the parallelism 
set on the RichOutputFormat is 10, output A and B create 10 files even if A 
receives all the records and B receives none.

My app has knowledge about the ratio of records it expects will be sent to 
output A vs output B, and I would ideally like that pass that down through the 
RichOutputFormat. Meaning that if we have a parallelism of 10, and know that 
70% of the Records being sent go to A, I would like to supply the A with 7 
parallelism and B with 3.

I’m curious because the current approach can lead to lots of redundant empty 
files, and I’d like to minimize that if possible. Is something like this 
supported?



Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>





Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


Re: Distribute Parallelism/Tasks within RichOutputFormat?

2020-12-23 Thread Chesnay Schepler

Essentially I see 2 options here:
a) split your output format such that each format is it's own sink, and 
then follow Flavio's suggestion to filter the stream and apply each sink 
to one of the streams, with the respective parallelism. This would be 
the recommended approach.
b) modify your (custom?) output format to only create one of the Hadoop 
output formats within open() based on the subtask index, and apply a 
custom partitioner onto the input datastream that routes the elements 
based on the conditions to the respective subtasks. I would not 
recommend this though, because it could be quite a headache 
maintenance-wise.


On 12/23/2020 9:53 AM, Flavio Pompermaier wrote:
I'm not an expert of the streaming APIs but you could try to do 
something like this:


DataStream ds = null;
DataStream ds1 = ds.filter(...).setParallelism(3);
DataStream ds2 = ds.filter(...).setParallelism(7);

Could it fit your needs?

Best,
Flavio

On Wed, Dec 23, 2020 at 3:54 AM Hailu, Andreas [Engineering] 
mailto:andreas.ha...@gs.com>> wrote:


Hi folks,

I’ve got a single RichOutputFormat which is comprised of two
HadoopOutputFormats, let’s call them A and B, each writing to
different HDFS directories. If a Record matches a certain
condition it’s written using A, otherwise it’s written with B.
Currently, the parallelism that is set at the RichOutputFormat
seems to propagates to both A & B – meaning if the parallelism set
on the RichOutputFormat is 10, output A and B create 10 files even
if A receives all the records and B receives none.

My app has knowledge about the ratio of records it expects will be
sent to output A vs output B, and I would ideally like that pass
that down through the RichOutputFormat. Meaning that if we have a
parallelism of 10, and know that 70% of the Records being sent go
to A, I would like to supply the A with 7 parallelism and B with 3.

I’m curious because the current approach can lead to lots of
redundant empty files, and I’d like to minimize that if possible.
Is something like this supported?



**

*Andreas Hailu***

*Data Lake Engineering *|**Goldman Sachs & Co.**




Your Personal Data: We may collect and process information about
you that may be subject to data protection laws. For more
information about how we use and disclose your personal data, how
we protect your information, our legal basis to use your
information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices 





Re: Distribute Parallelism/Tasks within RichOutputFormat?

2020-12-23 Thread Flavio Pompermaier
I'm not an expert of the streaming APIs but you could try to do something
like this:

DataStream ds = null;
DataStream ds1 = ds.filter(...).setParallelism(3);
DataStream ds2 = ds.filter(...).setParallelism(7);

Could it fit your needs?

Best,
Flavio

On Wed, Dec 23, 2020 at 3:54 AM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hi folks,
>
>
>
> I’ve got a single RichOutputFormat which is comprised of two
> HadoopOutputFormats, let’s call them A and B, each writing to different
> HDFS directories. If a Record matches a certain condition it’s written
> using A, otherwise it’s written with B. Currently, the parallelism that is
> set at the RichOutputFormat seems to propagates to both A & B – meaning if
> the parallelism set on the RichOutputFormat is 10, output A and B create 10
> files even if A receives all the records and B receives none.
>
>
>
> My app has knowledge about the ratio of records it expects will be sent to
> output A vs output B, and I would ideally like that pass that down through
> the RichOutputFormat. Meaning that if we have a parallelism of 10, and know
> that 70% of the Records being sent go to A, I would like to supply the A
> with 7 parallelism and B with 3.
>
>
>
> I’m curious because the current approach can lead to lots of redundant
> empty files, and I’d like to minimize that if possible. Is something like
> this supported?
>
>
>
> 
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs & Co.
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>