Repository: flink
Updated Branches:
  refs/heads/master 7ea9c0195 -> 873d6cd18


[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to 
customize queue config

This closes #2281


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f52d11af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f52d11af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f52d11af

Branch: refs/heads/master
Commit: f52d11af65d962fb79fe365c71938afae3fcbc11
Parents: 923c6a7
Author: philippgrulich <philippgrul...@hotmail.de>
Authored: Thu Jul 21 13:31:24 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f52d11af/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index be7e946..a0795d6 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-       private String queueName;
-       private RMQConnectionConfig rmqConnectionConfig;
-       private transient Connection connection;
-       private transient Channel channel;
-       private SerializationSchema<IN> schema;
+       protected final String queueName;
+       private final RMQConnectionConfig rmqConnectionConfig;
+       protected transient Connection connection;
+       protected transient Channel channel;
+       protected SerializationSchema<IN> schema;
        private boolean logFailuresOnly = false;
 
        /**
@@ -58,6 +58,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        }
 
        /**
+        * Sets up the queue. The default implementation just declares the 
queue. The user may override
+        * this method to have a custom setup for the queue (i.e. binding the 
queue to an exchange or
+        * defining custom queue parameters)
+        */
+       protected void setupQueue() throws IOException {
+               channel.queueDeclare(queueName, false, false, false, null);
+       }
+
+       /**
         * Defines whether the producer should fail on errors, or only log them.
         * If this is set to true, then exceptions will be only logged, if set 
to false,
         * exceptions will be eventually thrown and cause the streaming program 
to
@@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                        if (channel == null) {
                                throw new RuntimeException("None of RabbitMQ 
channels are available");
                        }
-                       channel.queueDeclare(queueName, false, false, false, 
null);
+                       setupQueue();
                } catch (IOException e) {
                        throw new RuntimeException("Error while creating the 
channel", e);
                }

Reply via email to