Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task 
managers count. The only thing that you should consider is how many 
simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough. 

Could you double check if something is not overriding those configuration 
values? If not could you provide the JobManager and TaskManager logs?

Piotrek

> On 11 Apr 2019, at 09:32, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Min,
> 
> I think the pool size is per parallel sink task, i.e., it should be 
> independent of the parallelism of the sink operator.
> From my understanding a pool size of 5 should be fine if the maximum number 
> of concurrent checkpoints is 1.
> Running out of connections would mean that there are 5 in-flight checkpoints 
> that were not completed, which seems a lot to me (given that the sink is 
> probably at the end of the program).
> 
> If I remember correctly, Piotr (in CC) was working on the exactly-once 
> feature of the Kafka producer.
> Maybe he can help.
> 
> Best,
> Fabian
> 
> Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <min....@ubs.com 
> <mailto:min....@ubs.com>>:
> Hi,
> 
>  
> 
> I keep getting exceptions 
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
> ongoing snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints."
> 
>  
> 
> I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
> this size. What considerations should I take to increase this size? what is a 
> size for a normal setting e.g. 32?
> 
>  
> 
> I have a check point setting like this and run a parallelism of 16 and have a 
> check point setting like this
> 
>  
> 
> public static void setup(StreamExecutionEnvironment env) {
> 
>     env.enableCheckpointing(2_000);
> 
>     
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 
>     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
> 
>     env.getCheckpointConfig().setCheckpointTimeout(60_000);
> 
>     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
>     env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));
> 
>     
> //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> }
> 
>  
> 
> Regards,
> 
>  
> 
> Min
> 

Reply via email to