RE: Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread min.tan
Many thanks for your replies.

After I increased MinPauseBetweenCheckpoints and moved to a memory backend for 
checkpoint. It has disappeared.

Thank you both again for your help.


Regards,

Min
From: Piotr Nowojski [mailto:pi...@ververica.com]
Sent: Donnerstag, 11. April 2019 15:01
To: Fabian Hueske
Cc: Tan, Min; user
Subject: [External] Re: Default Kafka producers pool size for 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE

Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task 
managers count. The only thing that you should consider is how many 
simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough.

Could you double check if something is not overriding those configuration 
values? If not could you provide the JobManager and TaskManager logs?

Piotrek


On 11 Apr 2019, at 09:32, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Min,

I think the pool size is per parallel sink task, i.e., it should be independent 
of the parallelism of the sink operator.
>From my understanding a pool size of 5 should be fine if the maximum number of 
>concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints 
that were not completed, which seems a lot to me (given that the sink is 
probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once feature 
of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb 
mailto:min@ubs.com>>:
Hi,

I keep getting exceptions 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
ongoing snapshots. Increase kafka producers pool size or decrease number of 
concurrent checkpoints."

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
this size. What considerations should I take to increase this size? what is a 
size for a normal setting e.g. 32?

I have a check point setting like this and run a parallelism of 16 and have a 
check point setting like this

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread Piotr Nowojski
Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task 
managers count. The only thing that you should consider is how many 
simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough. 

Could you double check if something is not overriding those configuration 
values? If not could you provide the JobManager and TaskManager logs?

Piotrek

> On 11 Apr 2019, at 09:32, Fabian Hueske  wrote:
> 
> Hi Min,
> 
> I think the pool size is per parallel sink task, i.e., it should be 
> independent of the parallelism of the sink operator.
> From my understanding a pool size of 5 should be fine if the maximum number 
> of concurrent checkpoints is 1.
> Running out of connections would mean that there are 5 in-flight checkpoints 
> that were not completed, which seems a lot to me (given that the sink is 
> probably at the end of the program).
> 
> If I remember correctly, Piotr (in CC) was working on the exactly-once 
> feature of the Kafka producer.
> Maybe he can help.
> 
> Best,
> Fabian
> 
> Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb  >:
> Hi,
> 
>  
> 
> I keep getting exceptions 
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
> ongoing snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints."
> 
>  
> 
> I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
> this size. What considerations should I take to increase this size? what is a 
> size for a normal setting e.g. 32?
> 
>  
> 
> I have a check point setting like this and run a parallelism of 16 and have a 
> check point setting like this
> 
>  
> 
> public static void setup(StreamExecutionEnvironment env) {
> 
> env.enableCheckpointing(2_000);
> 
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
> 
> env.getCheckpointConfig().setCheckpointTimeout(60_000);
> 
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));
> 
> 
> //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> }
> 
>  
> 
> Regards,
> 
>  
> 
> Min
> 



Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread Fabian Hueske
Hi Min,

I think the pool size is per parallel sink task, i.e., it should be
independent of the parallelism of the sink operator.
>From my understanding a pool size of 5 should be fine if the maximum number
of concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight
checkpoints that were not completed, which seems a lot to me (given that
the sink is probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once
feature of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb :

> Hi,
>
>
>
> I keep getting exceptions
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many
> ongoing snapshots. Increase kafka producers pool size or decrease number of
> concurrent checkpoints."
>
>
>
> I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to
> increase this size. What considerations should I take to increase this
> size? what is a size for a normal setting e.g. 32?
>
>
>
> I have a check point setting like this and run a parallelism of 16 and
> have a check point setting like this
>
>
>
> public static void setup(StreamExecutionEnvironment env) {
>
> env.enableCheckpointing(2_000);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
>
> env.getCheckpointConfig().setCheckpointTimeout(60_000);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));
>
>
> //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> }
>
>
>
> Regards,
>
>
>
> Min
>


Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-08 Thread min.tan
Hi,

I keep getting exceptions 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
ongoing snapshots. Increase kafka producers pool size or decrease number of 
concurrent checkpoints."

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
this size. What considerations should I take to increase this size? what is a 
size for a normal setting e.g. 32?

I have a check point setting like this and run a parallelism of 16 and have a 
check point setting like this

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-05 Thread min.tan
Hi,

 

I keep getting exceptions
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many
ongoing snapshots. Increase kafka producers pool size or decrease number of
concurrent checkpoints."

 

I think that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase
this size. What considerations should I take to increase this size?

 

I have a check point setting like this and run a parallelism of 16 and have
a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);
 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONC
E);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(10_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE));
 
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.E
xternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

 

Regards,

 

Min



smime.p7s
Description: S/MIME cryptographic signature