Re: kafka consumer parallelism

2017-10-05 Thread r. r.
Thanks a lot, Carst!
I hadn't realized that

Best regards






 > Оригинално писмо 

 >От: Carst Tankink ctank...@bol.com

 >Относно: Re: kafka consumer parallelism

 >До: "r. r." <rob...@abv.bg>

 >Изпратено на: 05.10.2017 09:04



 
> Hi,
 
> 
 
> The latter (map will be spread out if you rebalance before it).
 
> You can also see it in the flink dashboard you screen-shotted: the Source and 
> the map are in the same ‘block’, so the operators are chained to the same 
> task (and will run at the same parallelism/slot).
 
> 
 
> 
 
> Carst 
 
> 
 
> On 10/4/17, 12:36, "r. r." <rob...@abv.bg> wrote:
 
> 
 
> Thanks Timo & Tovi - this helped me get a better idea how it works
 
> 
 
> @Carst, I have rebalance after the map() 
> (messageStream.map(...).rebalance()) - doesn't it mean the load will be 
> redistributed across all job managers' slots anyway?
 
> Or is the map() spread out only if I do as you suggest 
> messageStream.rebalance().map(..) ?
 
> 
 
> Best regards
 
> Rob
 
> 
 
> 
 
> 
 
> 
 
> 
 
> 
 
> 
 
>     
 
>  > Оригинално писмо 
 
> 
 
>  >От: Carst Tankink ctank...@bol.com
 
> 
 
>  >Относно: Re: kafka consumer parallelism
 
> 
 
>  >До: "user@flink.apache.org" <user@flink.apache.org>
 
> 
 
>  >Изпратено на: 03.10.2017 11:30
 
> 
 
> 
 
> 
 
>  
 
> > (Accidentally sent this to Timo instead of to-list...)
 
>  
 
> > 
 
>  
 
> > Hi,
 
>  
 
> > 
 
>  
 
> > What Timo says is true, but in case you have a higher parallism than 
> the number of partitions (because you want to make use of it in a future 
> operator), you could do a .rebalance() (see 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#physical-partitioning)
>  after the Kafka source.
 
>  
 
> > This makes sure that all operators after the Kafka source get an even 
> load, at the cost of having to redistribute the documents (so there is 
> de/serialization + network overhead).
 
>  
 
> > 
 
>  
 
> > 
 
>  
 
> > Carst
 
>  
 
> > 
 
>  
 
> > On 10/3/17, 09:34, "Sofer, Tovi " <tovi.so...@citi.com> wrote:
 
>  
 
> > 
 
>  
 
> > Hi Robert,
 
>  
 
> > 
 
>  
 
> > I had similar issue.
 
>  
 
> > For me the problem was that the topic was auto created with one 
> partition.
 
>  
 
> > You can alter it to have 5 partitions using kafka-topics  command.
 
>  
 
> > Example: 
 
>  
 
> > kafka-topics --alter  --partitions 5 --topic fix --zookeeper 
> localhost:2181 
 
>  
 
> > 
 
>  
 
> > Regards,
 
>  
 
> > Tovi
 
>  
 
> > -Original Message-
 
>  
 
> > From: Timo Walther [mailto:twal...@apache.org] 
 
>  
 
> > Sent: יום ב 02 אוקטובר 2017 20:59
 
>  
 
> > To: user@flink.apache.org
 
>  
 
> > Subject: Re: kafka consumer parallelism
 
>  
 
> > 
 
>  
 
> > Hi,
 
>  
 
> > 
 
>  
 
> > I'm not a Kafka expert but I think you need to have more than 1 
> Kafka partition to process multiple documents at the same time. Make also 
> sure to send the documents to different partitions.
 
>  
 
> > 
 
>  
 
> > Regards,
 
>  
 
> > Timo
 
>  
 
> > 
 
>  
 
> > 
 
>  
 
> > Am 10/2/17 um 6:46 PM schrieb r. r.:
 
>  
 
> > > Hello
 
>  
 
> > > I'm running a job with "flink run -p5" and additionally set 
> env.setParallelism(5).
 
>  
 
> > > The source of the stream is Kafka, the job uses 
> FlinkKafkaConsumer010.
 
>  
 
> > > In Flink UI though I notice that if I send 3 documents to Kafka, 
> only one 'instance' of the consumer seems to receive Kafka's record and send 
> them to next operators, which according to Flink UI are properly parallelized.
 
>  
 
> > > What's the explanation of this behavior?
 
>  
 
> > > According to sources:
 
>  
 
> > >
 
>  
 
> > >

Re: kafka consumer parallelism

2017-10-05 Thread Carst Tankink
Hi,

The latter (map will be spread out if you rebalance before it).
You can also see it in the flink dashboard you screen-shotted: the Source and 
the map are in the same ‘block’, so the operators are chained to the same task 
(and will run at the same parallelism/slot).


Carst 

On 10/4/17, 12:36, "r. r." <rob...@abv.bg> wrote:

Thanks Timo & Tovi - this helped me get a better idea how it works

@Carst, I have rebalance after the map() 
(messageStream.map(...).rebalance()) - doesn't it mean the load will be 
redistributed across all job managers' slots anyway?
Or is the map() spread out only if I do as you suggest 
messageStream.rebalance().map(..) ?

Best regards
Rob








 > Оригинално писмо 

 >От: Carst Tankink ctank...@bol.com
    
 >Относно: Re: kafka consumer parallelism

 >До: "user@flink.apache.org" <user@flink.apache.org>

 >Изпратено на: 03.10.2017 11:30



 
> (Accidentally sent this to Timo instead of to-list...)
 
> 
 
> Hi,
 
> 
 
> What Timo says is true, but in case you have a higher parallism than the 
number of partitions (because you want to make use of it in a future operator), 
you could do a .rebalance() (see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#physical-partitioning)
 after the Kafka source.
 
> This makes sure that all operators after the Kafka source get an even 
load, at the cost of having to redistribute the documents (so there is 
de/serialization + network overhead).
 
> 
 
> 
 
> Carst
 
> 
 
> On 10/3/17, 09:34, "Sofer, Tovi " <tovi.so...@citi.com> wrote:
 
> 
 
> Hi Robert,
 
> 
 
> I had similar issue.
 
> For me the problem was that the topic was auto created with one 
partition.
 
> You can alter it to have 5 partitions using kafka-topics  command.
 
> Example: 
 
> kafka-topics --alter  --partitions 5 --topic fix --zookeeper 
localhost:2181 
 
> 
 
> Regards,
 
> Tovi
 
> -Original Message-
 
> From: Timo Walther [mailto:twal...@apache.org] 
 
    > Sent: יום ב 02 אוקטובר 2017 20:59
 
> To: user@flink.apache.org
 
> Subject: Re: kafka consumer parallelism
 
> 
 
> Hi,
 
> 
 
> I'm not a Kafka expert but I think you need to have more than 1 Kafka 
partition to process multiple documents at the same time. Make also sure to 
send the documents to different partitions.
 
> 
 
> Regards,
 
> Timo
 
> 
 
> 
 
> Am 10/2/17 um 6:46 PM schrieb r. r.:
 
> > Hello
 
> > I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
 
> > The source of the stream is Kafka, the job uses 
FlinkKafkaConsumer010.
 
> > In Flink UI though I notice that if I send 3 documents to Kafka, 
only one 'instance' of the consumer seems to receive Kafka's record and send 
them to next operators, which according to Flink UI are properly parallelized.
 
> > What's the explanation of this behavior?
 
> > According to sources:
 
> >
 
> > To enable parallel execution, the user defined source should
 
> >   * implement {@link 
 
> > 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
 
> > } or extend {@link
 
> >   * 
 
> > 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
 
> > tion}
 
> > which FlinkKafkaConsumer010 does
 
> >
 
> > Please check a screenshot at 
 
> > 
https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
 
> > 
=DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
 
> > 
_MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=ti6cswIJ4X9
 
> > d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw=   you'll see that only one 
sends 3 
 
> > records to the sinks
 
> >
 
> > My code is here: 
 
> > 
https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
 
> > 
XAAR=DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3
 
> > 
rJ99_MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=AApHKm3
 
> > amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw=
 
> >
 
> > Thanks!
 
> 
 
> 
 
> 




Re: kafka consumer parallelism

2017-10-04 Thread r. r.
Thanks Timo & Tovi - this helped me get a better idea how it works

@Carst, I have rebalance after the map() (messageStream.map(...).rebalance()) - 
doesn't it mean the load will be redistributed across all job managers' slots 
anyway?
Or is the map() spread out only if I do as you suggest 
messageStream.rebalance().map(..) ?

Best regards
Rob








 > Оригинално писмо 

 >От: Carst Tankink ctank...@bol.com

 >Относно: Re: kafka consumer parallelism

 >До: "user@flink.apache.org" <user@flink.apache.org>

 >Изпратено на: 03.10.2017 11:30



 
> (Accidentally sent this to Timo instead of to-list...)
 
> 
 
> Hi,
 
> 
 
> What Timo says is true, but in case you have a higher parallism than the 
> number of partitions (because you want to make use of it in a future 
> operator), you could do a .rebalance() (see 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#physical-partitioning)
>  after the Kafka source.
 
> This makes sure that all operators after the Kafka source get an even load, 
> at the cost of having to redistribute the documents (so there is 
> de/serialization + network overhead).
 
> 
 
> 
 
> Carst
 
> 
 
> On 10/3/17, 09:34, "Sofer, Tovi " <tovi.so...@citi.com> wrote:
 
> 
 
> Hi Robert,
 
> 
 
> I had similar issue.
 
> For me the problem was that the topic was auto created with one partition.
 
> You can alter it to have 5 partitions using kafka-topics  command.
 
> Example: 
 
> kafka-topics --alter  --partitions 5 --topic fix --zookeeper 
> localhost:2181 
 
> 
 
> Regards,
 
> Tovi
 
> -Original Message-
 
> From: Timo Walther [mailto:twal...@apache.org] 
 
> Sent: יום ב 02 אוקטובר 2017 20:59
 
> To: user@flink.apache.org
 
> Subject: Re: kafka consumer parallelism
 
> 
 
> Hi,
 
> 
 
> I'm not a Kafka expert but I think you need to have more than 1 Kafka 
> partition to process multiple documents at the same time. Make also sure to 
> send the documents to different partitions.
 
> 
 
> Regards,
 
> Timo
 
> 
 
> 
 
> Am 10/2/17 um 6:46 PM schrieb r. r.:
 
> > Hello
 
> > I'm running a job with "flink run -p5" and additionally set 
> env.setParallelism(5).
 
> > The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
 
> > In Flink UI though I notice that if I send 3 documents to Kafka, only 
> one 'instance' of the consumer seems to receive Kafka's record and send them 
> to next operators, which according to Flink UI are properly parallelized.
 
> > What's the explanation of this behavior?
 
> > According to sources:
 
> >
 
> > To enable parallel execution, the user defined source should
 
> >   * implement {@link 
 
> > org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
 
> > } or extend {@link
 
> >   * 
 
> > org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
 
> > tion}
 
> > which FlinkKafkaConsumer010 does
 
> >
 
> > Please check a screenshot at 
 
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
 
> > =DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
 
> > _MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=ti6cswIJ4X9
 
> > d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw=   you'll see that only one sends 3 
 
> > records to the sinks
 
> >
 
> > My code is here: 
 
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
 
> > XAAR=DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3
 
> > rJ99_MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=AApHKm3
 
> > amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw=
 
> >
 
> > Thanks!
 
> 
 
> 
 
> 


Re: kafka consumer parallelism

2017-10-03 Thread Carst Tankink
(Accidentally sent this to Timo instead of to-list...)

Hi,

What Timo says is true, but in case you have a higher parallism than the number 
of partitions (because you want to make use of it in a future operator), you 
could do a .rebalance() (see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#physical-partitioning)
 after the Kafka source.
This makes sure that all operators after the Kafka source get an even load, at 
the cost of having to redistribute the documents (so there is de/serialization 
+ network overhead).


Carst

On 10/3/17, 09:34, "Sofer, Tovi " <tovi.so...@citi.com> wrote:

Hi Robert,

I had similar issue.
For me the problem was that the topic was auto created with one partition.
You can alter it to have 5 partitions using kafka-topics  command.
Example: 
kafka-topics --alter  --partitions 5 --topic fix --zookeeper localhost:2181 

Regards,
Tovi
-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: יום ב 02 אוקטובר 2017 20:59
To: user@flink.apache.org
    Subject: Re: kafka consumer parallelism

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka 
partition to process multiple documents at the same time. Make also sure to 
send the documents to different partitions.

Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:
> Hello
> I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
> The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
> In Flink UI though I notice that if I send 3 documents to Kafka, only one 
'instance' of the consumer seems to receive Kafka's record and send them to 
next operators, which according to Flink UI are properly parallelized.
> What's the explanation of this behavior?
> According to sources:
>
> To enable parallel execution, the user defined source should
>   * implement {@link 
> org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> } or extend {@link
>   * 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
> tion}
> which FlinkKafkaConsumer010 does
>
> Please check a screenshot at 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
> =DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
> _MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=ti6cswIJ4X9
> d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw=   you'll see that only one sends 3 
> records to the sinks
>
> My code is here: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
> XAAR=DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3
> rJ99_MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=AApHKm3
> amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw=
>
> Thanks!






RE: kafka consumer parallelism

2017-10-03 Thread Sofer, Tovi
Hi Robert,

I had similar issue.
For me the problem was that the topic was auto created with one partition.
You can alter it to have 5 partitions using kafka-topics  command.
Example: 
kafka-topics --alter  --partitions 5 --topic fix --zookeeper localhost:2181 

Regards,
Tovi
-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: יום ב 02 אוקטובר 2017 20:59
To: user@flink.apache.org
Subject: Re: kafka consumer parallelism

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka partition 
to process multiple documents at the same time. Make also sure to send the 
documents to different partitions.

Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:
> Hello
> I'm running a job with "flink run -p5" and additionally set 
> env.setParallelism(5).
> The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
> In Flink UI though I notice that if I send 3 documents to Kafka, only one 
> 'instance' of the consumer seems to receive Kafka's record and send them to 
> next operators, which according to Flink UI are properly parallelized.
> What's the explanation of this behavior?
> According to sources:
>
> To enable parallel execution, the user defined source should
>       * implement {@link 
> org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> } or extend {@link
>       * 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
> tion}
> which FlinkKafkaConsumer010 does
>
> Please check a screenshot at 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
> =DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
> _MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=ti6cswIJ4X9
> d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw=   you'll see that only one sends 3 
> records to the sinks
>
> My code is here: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
> XAAR=DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3
> rJ99_MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=AApHKm3
> amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw=
>
> Thanks!




Re: kafka consumer parallelism

2017-10-02 Thread Timo Walther

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka 
partition to process multiple documents at the same time. Make also sure 
to send the documents to different partitions.


Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:

Hello
I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
In Flink UI though I notice that if I send 3 documents to Kafka, only one 
'instance' of the consumer seems to receive Kafka's record and send them to 
next operators, which according to Flink UI are properly parallelized.
What's the explanation of this behavior?
According to sources:

To enable parallel execution, the user defined source should
      * implement {@link 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or 
extend {@link
      * 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}
which FlinkKafkaConsumer010 does

Please check a screenshot at https://imgur.com/a/E1H9r  you'll see that only 
one sends 3 records to the sinks

My code is here: https://pastebin.com/yjYCXAAR

Thanks!





kafka consumer parallelism

2017-10-02 Thread r. r.
Hello
I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
In Flink UI though I notice that if I send 3 documents to Kafka, only one 
'instance' of the consumer seems to receive Kafka's record and send them to 
next operators, which according to Flink UI are properly parallelized.
What's the explanation of this behavior? 
According to sources:

To enable parallel execution, the user defined source should
     * implement {@link 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or 
extend {@link
     * 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}
which FlinkKafkaConsumer010 does

Please check a screenshot at https://imgur.com/a/E1H9r  you'll see that only 
one sends 3 records to the sinks

My code is here: https://pastebin.com/yjYCXAAR

Thanks!