Hi ajay,

When you have 3 parallelisms you will have 3 independent clients. If you
want to keep prefetch count 3 you need to set setRequestedChannelMax as 1
and setParallelism 3. So All 3 clients can have one connection.

Talat

On Tue, May 7, 2024 at 5:52 AM ajay pandey <ajaypandey3...@gmail.com> wrote:

> Hi Flink Team,
>
>
> I am currently reading streaming data from RabbitMQ and using the
> RMQConnectionConfig for establishing the connection. Here's how I'm setting
> up the connection:
> and we use flink version 1.16.2 and RabbitMQ version 3.10.7
>
>  RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
>     .setPrefetchCount(smsInput.prefetchCount)
>     .setHost(smsInput.HostServer)
>     .setPort(smsInput.HostPort)
>     .setUserName(smsInput.HostUserName)
>     .setPassword(smsInput.HostPassword)
>     .setVirtualHost("/")
>     .build();
>
> ConnectionFactory rabbitMQConnectionFactory =
> connectionConfig.getConnectionFactory();
> rabbitMQConnectionFactory.setRequestedChannelMax(smsInput.prefetchCount);
> // Set prefetchcount
>
> DataStream<String> stream = executionEnvironment.addSource(new
> RMQSource<String>(connectionConfig,
>                          smsInput.QueueName,
>                          new SimpleStringSchema()))
>                          .setParallelism(1);
>
>
> Additionally, I have configured the prefetch count to read 3 data at the
> same time from RabbitMQ. Here's how I have enabled the checkpointing
> interval.
>
>
> executionEnvironment.enableCheckpointing(smsInput.checkpointIntervalMS,CheckpointingMode.EXACTLY_ONCE,true);
>
> The prefetch count seems to be working fine, but when I run the job with a
> parallelism of 3, the prefetchCount is not working as expected.
>
> We establish a connection to RabbitMQ with a fixed setParallelism of 1.
> However, my other operators retrieve data from RabbitMQ and execute the job
> with a parallelism of 3, as shown in the following command.
>
> bin/flink run -p 3 ../apps/Flink_1.16.2_prefetch.jar
> ../config/app-config.properties -yD
> env.java.home=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64
>
> So kindly provide a solution for configuring the prefetch count with
> parallelism.
>
>
>
> Thanks,
> Ajay Pandey
>

Reply via email to