Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4979#discussion_r162632757 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java --- @@ -138,7 +138,9 @@ protected ConnectionFactory setupConnectionFactory() throws Exception { * defining custom queue parameters) */ protected void setupQueue() throws IOException { - channel.queueDeclare(queueName, true, false, false, null); + if (rmqConnectionConfig.isQueueDeclaration()) { --- End diff -- Thanks for your contribution to Apache Flink @sihuazhou! I have reviewed your code, and I am not sure if the additional flag is needed. The original author of the `RMQSource` declared this method protected, which means that if you do not want the queue to be declared, you can simply override the method with an empty implementation. For example: ``` env.addSource(new RMQSource<String>( connectionConfig, "queueName", true, new SimpleStringSchema()) { @Override protected void setupQueue() { // do not declare queue } }); ``` This intent is also reflected in the Javadoc: ``` /** * Sets up the queue. The default implementation just declares the queue. The user may override * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or * defining custom queue parameters) */ ``` Moreover, `RMQSink#setupQueue` also declares the queue by default, which is not addressed in your pull request. Please let me know what you think @sihuazhou cc: @tzulitai @zentol
---