[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to customize queue config
This closes #2281 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf1bbb15 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf1bbb15 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf1bbb15 Branch: refs/heads/release-1.1 Commit: bf1bbb15db06a60b222b835c1439899324c9ba9f Parents: 70a0479 Author: philippgrulich <philippgrul...@hotmail.de> Authored: Thu Jul 21 13:31:24 2016 -0700 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Aug 1 20:07:40 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/rabbitmq/RMQSink.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bf1bbb15/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index be7e946..a0795d6 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - private String queueName; - private RMQConnectionConfig rmqConnectionConfig; - private transient Connection connection; - private transient Channel channel; - private SerializationSchema<IN> schema; + protected final String queueName; + private final RMQConnectionConfig rmqConnectionConfig; + protected transient Connection connection; + protected transient Channel channel; + protected SerializationSchema<IN> schema; private boolean logFailuresOnly = false; /** @@ -58,6 +58,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { } /** + * Sets up the queue. The default implementation just declares the queue. The user may override + * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or + * defining custom queue parameters) + */ + protected void setupQueue() throws IOException { + channel.queueDeclare(queueName, false, false, false, null); + } + + /** * Defines whether the producer should fail on errors, or only log them. * If this is set to true, then exceptions will be only logged, if set to false, * exceptions will be eventually thrown and cause the streaming program to @@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { if (channel == null) { throw new RuntimeException("None of RabbitMQ channels are available"); } - channel.queueDeclare(queueName, false, false, false, null); + setupQueue(); } catch (IOException e) { throw new RuntimeException("Error while creating the channel", e); }