[FLINK-3763] RabbitMQ Source/Sink standardize connection parameters

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86a80336
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86a80336
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86a80336

Branch: refs/heads/master
Commit: 86a803366e1371ad5af539b74565e7b8ea102580
Parents: 776253c
Author: subhankar <subhankar.bis...@target.com>
Authored: Tue May 31 16:08:27 2016 +0530
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Thu Jun 9 15:16:32 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/rabbitmq.md      |  28 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |  55 ++-
 .../connectors/rabbitmq/RMQSource.java          |  91 +---
 .../rabbitmq/common/RMQConnectionConfig.java    | 448 +++++++++++++++++++
 .../connectors/rabbitmq/RMQSourceTest.java      |  31 +-
 .../common/RMQConnectionConfigTest.java         |  69 +++
 6 files changed, 598 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/docs/apis/streaming/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/rabbitmq.md 
b/docs/apis/streaming/connectors/rabbitmq.md
index b48608d..df8cf80 100644
--- a/docs/apis/streaming/connectors/rabbitmq.md
+++ b/docs/apis/streaming/connectors/rabbitmq.md
@@ -47,7 +47,7 @@ A class which provides an interface for receiving data from 
RabbitMQ.
 
 The followings have to be provided for the `RMQSource(…)` constructor in 
order:
 
-- hostName: The RabbitMQ broker hostname.
+- RMQConnectionConfig.
 - queueName: The RabbitMQ queue name.
 - usesCorrelationId: `true` when correlation ids should be used, `false` 
otherwise (default is `false`).
 - deserializationSchema: Deserialization schema to turn messages into Java 
objects.
@@ -71,23 +71,29 @@ Example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build();
 DataStream<String> streamWithoutCorrelationIds = env
-       .addSource(new RMQSource<String>("localhost", "hello", new 
SimpleStringSchema()))
+       .addSource(new RMQSource<String>(connectionConfig, "hello", new 
SimpleStringSchema()))
        .print
 
 DataStream<String> streamWithCorrelationIds = env
-       .addSource(new RMQSource<String>("localhost", "hello", true, new 
SimpleStringSchema()))
+       .addSource(new RMQSource<String>(connectionConfig, "hello", true, new 
SimpleStringSchema()))
        .print
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build()
 streamWithoutCorrelationIds = env
-    .addSource(new RMQSource[String]("localhost", "hello", new 
SimpleStringSchema))
+    .addSource(new RMQSource[String](connectionConfig, "hello", new 
SimpleStringSchema))
     .print
 
 streamWithCorrelationIds = env
-    .addSource(new RMQSource[String]("localhost", "hello", true, new 
SimpleStringSchema))
+    .addSource(new RMQSource[String](connectionConfig, "hello", true, new 
SimpleStringSchema))
     .print
 {% endhighlight %}
 </div>
@@ -98,7 +104,7 @@ A class providing an interface for sending data to RabbitMQ.
 
 The followings have to be provided for the `RMQSink(…)` constructor in order:
 
-1. The hostname
+1. RMQConnectionConfig
 2. The queue name
 3. Serialization schema
 
@@ -107,12 +113,18 @@ Example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new RMQSink<String>("localhost", "hello", new 
StringToByteSerializer()));
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build();
+stream.addSink(new RMQSink<String>(connectionConfig, "hello", new 
StringToByteSerializer()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-stream.addSink(new RMQSink[String]("localhost", "hello", new 
StringToByteSerializer))
+val connectionConfig = new RMQConnectionConfig.Builder()
+.setHost("localhost").setPort(5000).setUserName(..)
+.setPassword(..).setVirtualHost("/").build()
+stream.addSink(new RMQSink[String](connectionConfig, "hello", new 
StringToByteSerializer))
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index ca18fc4..bf0cef7 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,38 +35,26 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-       private String QUEUE_NAME;
-       private String HOST_NAME;
-       private transient ConnectionFactory factory;
+       private String queueName;
+       private RMQConnectionConfig rmqConnectionConfig;
        private transient Connection connection;
        private transient Channel channel;
        private SerializationSchema<IN> schema;
 
-       public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationSchema<IN> schema) {
-               this.HOST_NAME = HOST_NAME;
-               this.QUEUE_NAME = QUEUE_NAME;
-               this.schema = schema;
-       }
-
        /**
-        * Initializes the connection to RMQ.
-        */
-       public void initializeConnection() {
-               factory = new ConnectionFactory();
-               factory.setHost(HOST_NAME);
-               try {
-                       connection = factory.newConnection();
-                       channel = connection.createChannel();
-                       channel.queueDeclare(QUEUE_NAME, false, false, false, 
null);
-
-               } catch (IOException e) {
-                       throw new RuntimeException(e);
-               }
+        * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+        * @param queueName The queue to publish messages to.
+        * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+     */
+       public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema<IN> schema) {
+               this.rmqConnectionConfig = rmqConnectionConfig;
+               this.queueName = queueName;
+               this.schema = schema;
        }
 
        /**
         * Called when new data arrives to the sink, and forwards it to RMQ.
-        * 
+        *
         * @param value
         *            The incoming data
         */
@@ -74,11 +63,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                try {
                        byte[] msg = schema.serialize(value);
 
-                       channel.basicPublish("", QUEUE_NAME, null, msg);
+                       channel.basicPublish("", queueName, null, msg);
 
                } catch (IOException e) {
                        if (LOG.isErrorEnabled()) {
-                               LOG.error("Cannot send RMQ message {} at {}", 
QUEUE_NAME, HOST_NAME);
+                               LOG.error("Cannot send RMQ message {} at {}", 
queueName, rmqConnectionConfig.getHost());
                        }
                }
 
@@ -92,15 +81,23 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                        channel.close();
                        connection.close();
                } catch (IOException e) {
-                       throw new RuntimeException("Error while closing RMQ 
connection with " + QUEUE_NAME
-                                       + " at " + HOST_NAME, e);
+                       throw new RuntimeException("Error while closing RMQ 
connection with " + queueName
+                               + " at " + rmqConnectionConfig.getHost(), e);
                }
 
        }
 
        @Override
-       public void open(Configuration config) {
-               initializeConnection();
+       public void open(Configuration config) throws Exception {
+               ConnectionFactory factory = 
rmqConnectionConfig.getConnectionFactory();
+               try {
+                       connection = factory.newConnection();
+                       channel = connection.createChannel();
+                       channel.queueDeclare(queueName, false, false, false, 
null);
+
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4f6a07f..8297f9c 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
 import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 import com.rabbitmq.client.Channel;
@@ -66,17 +67,14 @@ import org.slf4j.LoggerFactory;
  * @param <OUT> The type of the data read from RabbitMQ.
  */
 public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
-               implements ResultTypeQueryable<OUT> {
+       implements ResultTypeQueryable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RMQSource.class);
 
-       private final String hostName;
-       private final Integer port;
-       private final String username;
-       private final String password;
-       protected final String queueName;
+       private final RMQConnectionConfig rmqConnectionConfig;
+       private final String queueName;
        private final boolean usesCorrelationId;
        protected DeserializationSchema<OUT> schema;
 
@@ -92,16 +90,16 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
         * Creates a new RabbitMQ source with at-least-once message processing 
guarantee when
         * checkpointing is enabled. No strong delivery guarantees when 
checkpointing is disabled.
         * For exactly-once, please use the constructor
-        * {@link RMQSource#RMQSource(String, String, boolean 
usesCorrelationId, DeserializationSchema)},
+        * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean 
usesCorrelationId, DeserializationSchema)},
         * set {@param usesCorrelationId} to true and enable checkpointing.
-        * @param hostName The RabbiMQ broker's address to connect to.
+        * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
         * @param queueName  The queue to receive messages from.
         * @param deserializationSchema A {@link DeserializationSchema} for 
turning the bytes received
         *                                              into Java objects.
         */
-       public RMQSource(String hostName, String queueName,
-                               DeserializationSchema<OUT> 
deserializationSchema) {
-               this(hostName, null, null, null, queueName, false, 
deserializationSchema);
+       public RMQSource(RMQConnectionConfig rmqConnectionConfig, String 
queueName,
+                                       DeserializationSchema<OUT> 
deserializationSchema) {
+               this(rmqConnectionConfig, queueName, false, 
deserializationSchema);
        }
 
        /**
@@ -109,7 +107,7 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
         * at the producer. The correlation id must be unique. Otherwise the 
behavior of the source is
         * undefined. In doubt, set {@param usesCorrelationId} to false. When 
correlation ids are not
         * used, this source has at-least-once processing semantics when 
checkpointing is enabled.
-        * @param hostName The RabbitMQ broker's address to connect to.
+        * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
         * @param queueName The queue to receive messages from.
         * @param usesCorrelationId Whether the messages received are supplied 
with a <b>unique</b>
         *                          id to deduplicate messages (in case of 
failed acknowledgments).
@@ -117,53 +115,10 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
         * @param deserializationSchema A {@link DeserializationSchema} for 
turning the bytes received
         *                              into Java objects.
         */
-       public RMQSource(String hostName, String queueName, boolean 
usesCorrelationId,
-                               DeserializationSchema<OUT> 
deserializationSchema) {
-               this(hostName, null, null, null, queueName, usesCorrelationId, 
deserializationSchema);
-       }
-
-       /**
-        * Creates a new RabbitMQ source. For exactly-once, you must set the 
correlation ids of messages
-        * at the producer. The correlation id must be unique. Otherwise the 
behavior of the source is
-        * undefined. In doubt, set {@param usesCorrelationId} to false. When 
correlation ids are not
-        * used, this source has at-least-once processing semantics when 
checkpointing is enabled.
-        * @param hostName The RabbitMQ broker's address to connect to.
-        * @param port The RabbitMQ broker's port.
-        * @param queueName The queue to receive messages from.
-        * @param usesCorrelationId Whether the messages received are supplied 
with a <b>unique</b>
-        *                          id to deduplicate messages (in case of 
failed acknowledgments).
-        *                          Only used when checkpointing is enabled.
-        * @param deserializationSchema A {@link DeserializationSchema} for 
turning the bytes received
-        *                              into Java objects.
-        */
-       public RMQSource(String hostName, Integer port,
-                               String queueName, boolean usesCorrelationId,
-                               DeserializationSchema<OUT> 
deserializationSchema) {
-               this(hostName, port, null, null, queueName, usesCorrelationId, 
deserializationSchema);
-       }
-
-       /**
-        * Creates a new RabbitMQ source. For exactly-once, you must set the 
correlation ids of messages
-        * at the producer. The correlation id must be unique. Otherwise the 
behavior of the source is
-        * undefined. In doubt, set {@param usesCorrelationId} to false. When 
correlation ids are not
-        * used, this source has at-least-once processing semantics when 
checkpointing is enabled.
-        * @param hostName The RabbitMQ broker's address to connect to.
-        * @param port The RabbitMQ broker's port.
-        * @param queueName The queue to receive messages from.
-        * @param usesCorrelationId Whether the messages received are supplied 
with a <b>unique</b>
-        *                          id to deduplicate messages (in case of 
failed acknowledgments).
-        *                          Only used when checkpointing is enabled.
-        * @param deserializationSchema A {@link DeserializationSchema} for 
turning the bytes received
-        *                              into Java objects.
-        */
-       public RMQSource(String hostName, Integer port, String username, String 
password,
-                               String queueName, boolean usesCorrelationId,
-                               DeserializationSchema<OUT> 
deserializationSchema) {
+       public RMQSource(RMQConnectionConfig rmqConnectionConfig,
+                                       String queueName, boolean 
usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
                super(String.class);
-               this.hostName = hostName;
-               this.port = port;
-               this.username = username;
-               this.password = password;
+               this.rmqConnectionConfig = rmqConnectionConfig;
                this.queueName = queueName;
                this.usesCorrelationId = usesCorrelationId;
                this.schema = deserializationSchema;
@@ -173,8 +128,8 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
         * Initializes the connection to RMQ with a default connection factory. 
The user may override
         * this method to setup and configure their own ConnectionFactory.
         */
-       protected ConnectionFactory setupConnectionFactory() {
-               return new ConnectionFactory();
+       protected ConnectionFactory setupConnectionFactory() throws Exception {
+               return rmqConnectionConfig.getConnectionFactory();
        }
 
        /**
@@ -189,18 +144,8 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
        /**
         * Initializes the connection to RMQ.
         */
-       private void initializeConnection() {
+       private void initializeConnection() throws Exception {
                ConnectionFactory factory = setupConnectionFactory();
-               factory.setHost(hostName);
-               if (port != null) {
-                       factory.setPort(port);
-               }
-               if (username != null) {
-                       factory.setUsername(username);
-               }
-               if (password != null) {
-                       factory.setPassword(password);
-               }
                try {
                        connection = factory.newConnection();
                        channel = connection.createChannel();
@@ -222,7 +167,7 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
 
                } catch (IOException e) {
                        throw new RuntimeException("Cannot create RMQ 
connection with " + queueName + " at "
-                                       + hostName, e);
+                               + rmqConnectionConfig.getHost(), e);
                }
        }
 
@@ -240,7 +185,7 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
                        connection.close();
                } catch (IOException e) {
                        throw new RuntimeException("Error while closing RMQ 
connection with " + queueName
-                                       + " at " + hostName, e);
+                               + " at " + rmqConnectionConfig.getHost(), e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
new file mode 100644
index 0000000..0ce7e79
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, Integer,
+ * Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, 
String, String, Integer, Boolean,
+ * Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+       private String host;
+       private Integer port;
+       private String virtualHost;
+       private String username;
+       private String password;
+       private String uri;
+
+       private Integer networkRecoveryInterval;
+       private Boolean automaticRecovery;
+       private Boolean topologyRecovery;
+
+       private Integer connectionTimeout;
+       private Integer requestedChannelMax;
+       private Integer requestedFrameMax;
+       private Integer requestedHeartbeat;
+
+       /**
+       *
+       * @param host host name
+       * @param port port
+       * @param virtualHost virtual host
+       * @param username username
+       * @param password password
+       * @param networkRecoveryInterval connection recovery interval in 
milliseconds
+       * @param automaticRecovery if automatic connection recovery
+       * @param topologyRecovery if topology recovery
+       * @param connectionTimeout connection timeout
+       * @param requestedChannelMax requested maximum channel number
+       * @param requestedFrameMax requested maximum frame size
+       * @param requestedHeartbeat requested heartbeat interval
+       * @throws NullPointerException if host or virtual host or username or 
password is null
+    */
+       private RMQConnectionConfig(String host, Integer port, String 
virtualHost, String username, String password,
+                                                               Integer 
networkRecoveryInterval, Boolean automaticRecovery,
+                                                               Boolean 
topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+                                                               Integer 
requestedFrameMax, Integer requestedHeartbeat){
+               Preconditions.checkNotNull(host, "host can not be null");
+               Preconditions.checkNotNull(port, "port can not be null");
+               Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+               Preconditions.checkNotNull(username, "username can not be 
null");
+               Preconditions.checkNotNull(password, "password can not be 
null");
+               this.host = host;
+               this.port = port;
+               this.virtualHost = virtualHost;
+               this.username = username;
+               this.password = password;
+
+               this.networkRecoveryInterval = networkRecoveryInterval;
+               this.automaticRecovery = automaticRecovery;
+               this.topologyRecovery = topologyRecovery;
+               this.connectionTimeout = connectionTimeout;
+               this.requestedChannelMax = requestedChannelMax;
+               this.requestedFrameMax = requestedFrameMax;
+               this.requestedHeartbeat = requestedHeartbeat;
+       }
+
+       /**
+       *
+       * @param uri the connection URI
+       * @param networkRecoveryInterval connection recovery interval in 
milliseconds
+       * @param automaticRecovery if automatic connection recovery
+       * @param topologyRecovery if topology recovery
+       * @param connectionTimeout connection timeout
+       * @param requestedChannelMax requested maximum channel number
+       * @param requestedFrameMax requested maximum frame size
+       * @param requestedHeartbeat requested heartbeat interval
+       * @throws NullPointerException if URI is null
+       */
+       private RMQConnectionConfig(String uri, Integer 
networkRecoveryInterval, Boolean automaticRecovery,
+                                                               Boolean 
topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+                                                               Integer 
requestedFrameMax, Integer requestedHeartbeat){
+               Preconditions.checkNotNull(uri, "Uri can not be null");
+               this.uri = uri;
+
+               this.networkRecoveryInterval = networkRecoveryInterval;
+               this.automaticRecovery = automaticRecovery;
+               this.topologyRecovery = topologyRecovery;
+               this.connectionTimeout = connectionTimeout;
+               this.requestedChannelMax = requestedChannelMax;
+               this.requestedFrameMax = requestedFrameMax;
+               this.requestedHeartbeat = requestedHeartbeat;
+       }
+
+       /** @return the host to use for connections */
+       public String getHost() {
+               return host;
+       }
+
+       /** @return the port to use for connections */
+       public int getPort() {
+               return port;
+       }
+
+       /**
+        * Retrieve the virtual host.
+        * @return the virtual host to use when connecting to the broker
+        */
+       public String getVirtualHost() {
+               return virtualHost;
+       }
+
+       /**
+        * Retrieve the user name.
+        * @return the AMQP user name to use when connecting to the broker
+        */
+       public String getUsername() {
+               return username;
+       }
+
+       /**
+        * Retrieve the password.
+        * @return the password to use when connecting to the broker
+        */
+       public String getPassword() {
+               return password;
+       }
+
+       /**
+        * Retrieve the URI.
+        * @return the connection URI when connecting to the broker
+     */
+       public String getUri() {
+               return uri;
+       }
+
+       /**
+        * Returns automatic connection recovery interval in milliseconds.
+        * @return how long will automatic recovery wait before attempting to 
reconnect, in ms; default is 5000
+        */
+       public Integer getNetworkRecoveryInterval() {
+               return networkRecoveryInterval;
+       }
+
+       /**
+        * Returns true if automatic connection recovery is enabled, false 
otherwise
+        * @return true if automatic connection recovery is enabled, false 
otherwise
+        */
+       public Boolean isAutomaticRecovery() {
+               return automaticRecovery;
+       }
+
+       /**
+        * Returns true if topology recovery is enabled, false otherwise
+        * @return true if topology recovery is enabled, false otherwise
+        */
+       public Boolean isTopologyRecovery() {
+               return topologyRecovery;
+       }
+
+       /**
+        * Retrieve the connection timeout.
+        * @return the connection timeout, in milliseconds; zero for infinite
+        */
+       public Integer getConnectionTimeout() {
+               return connectionTimeout;
+       }
+
+       /**
+        * Retrieve the requested maximum channel number
+        * @return the initially requested maximum channel number; zero for 
unlimited
+        */
+       public Integer getRequestedChannelMax() {
+               return requestedChannelMax;
+       }
+
+       /**
+        * Retrieve the requested maximum frame size
+        * @return the initially requested maximum frame size, in octets; zero 
for unlimited
+        */
+       public Integer getRequestedFrameMax() {
+               return requestedFrameMax;
+       }
+
+       /**
+        * Retrieve the requested heartbeat interval.
+        * @return the initially requested heartbeat interval, in seconds; zero 
for none
+        */
+       public Integer getRequestedHeartbeat() {
+               return requestedHeartbeat;
+       }
+
+       /**
+        *
+        * @return Connection Factory for RMQ
+        * @throws URISyntaxException, NoSuchAlgorithmException, 
KeyManagementException if Malformed URI has been passed
+     */
+       public ConnectionFactory getConnectionFactory() throws 
URISyntaxException,
+               NoSuchAlgorithmException, KeyManagementException {
+               ConnectionFactory factory = new ConnectionFactory();
+               if (this.uri != null && !this.uri.isEmpty()){
+                       try {
+                               factory.setUri(getUri());
+                       }catch (URISyntaxException | NoSuchAlgorithmException | 
KeyManagementException e){
+                               LOG.error("Failed to parse uri {}", 
e.getMessage());
+                               throw e;
+                       }
+               } else {
+                       factory.setHost(this.host);
+                       factory.setPort(this.port);
+                       factory.setVirtualHost(this.virtualHost);
+                       factory.setUsername(this.username);
+                       factory.setPassword(this.password);
+               }
+
+               if (this.automaticRecovery != null) {
+                       
factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
+               }
+               if (this.connectionTimeout != null) {
+                       factory.setConnectionTimeout(this.connectionTimeout);
+               }
+               if (this.networkRecoveryInterval != null) {
+                       
factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
+               }
+               if (this.requestedHeartbeat != null) {
+                       factory.setRequestedHeartbeat(this.requestedHeartbeat);
+               }
+               if (this.topologyRecovery != null) {
+                       
factory.setTopologyRecoveryEnabled(this.topologyRecovery);
+               }
+               if (this.requestedChannelMax != null) {
+                       
factory.setRequestedChannelMax(this.requestedChannelMax);
+               }
+               if (this.requestedFrameMax != null) {
+                       factory.setRequestedFrameMax(this.requestedFrameMax);
+               }
+
+               return factory;
+       }
+
+       /**
+        * The Builder Class for {@link RMQConnectionConfig}
+        */
+       public static class Builder {
+
+               private String host;
+               private Integer port;
+               private String virtualHost;
+               private String username;
+               private String password;
+
+               private Integer networkRecoveryInterval;
+               private Boolean automaticRecovery;
+               private Boolean topologyRecovery;
+
+               private Integer connectionTimeout;
+               private Integer requestedChannelMax;
+               private Integer requestedFrameMax;
+               private Integer requestedHeartbeat;
+
+               private String uri;
+
+               /**
+                * Set the target port.
+                * @param port the default port to use for connections
+                * @return the Builder
+                */
+               public Builder setPort(int port) {
+                       this.port = port;
+                       return this;
+               }
+
+               /** @param host the default host to use for connections
+                * @return the Builder
+                */
+               public Builder setHost(String host) {
+                       this.host = host;
+                       return this;
+               }
+
+               /**
+                * Set the virtual host.
+                * @param virtualHost the virtual host to use when connecting 
to the broker
+                * @return the Builder
+                */
+               public Builder setVirtualHost(String virtualHost) {
+                       this.virtualHost = virtualHost;
+                       return this;
+               }
+
+               /**
+                * Set the user name.
+                * @param username the AMQP user name to use when connecting to 
the broker
+                * @return the Builder
+                */
+               public Builder setUserName(String username) {
+                       this.username = username;
+                       return this;
+               }
+
+               /**
+                * Set the password.
+                * @param password the password to use when connecting to the 
broker
+                * @return the Builder
+                */
+               public Builder setPassword(String password) {
+                       this.password = password;
+                       return this;
+               }
+
+               /**
+                * Convenience method for setting the fields in an AMQP URI: 
host,
+                * port, username, password and virtual host.  If any part of 
the
+                * URI is ommited, the ConnectionFactory's corresponding 
variable
+                * is left unchanged.
+                * @param uri is the AMQP URI containing the data
+                * @return the Builder
+                */
+               public Builder setUri(String uri) {
+                       this.uri = uri;
+                       return this;
+               }
+
+               /**
+                * Enables or disables topology recovery
+                * @param topologyRecovery if true, enables topology recovery
+                * @return the Builder
+                */
+               public Builder setTopologyRecoveryEnabled(boolean 
topologyRecovery) {
+                       this.topologyRecovery = topologyRecovery;
+                       return this;
+               }
+
+               /**
+                * Set the requested heartbeat.
+                * @param requestedHeartbeat the initially requested heartbeat 
interval, in seconds; zero for none
+                * @return the Builder
+                */
+               public Builder setRequestedHeartbeat(int requestedHeartbeat) {
+                       this.requestedHeartbeat = requestedHeartbeat;
+                       return this;
+               }
+
+               /**
+                * Set the requested maximum frame size
+                * @param requestedFrameMax initially requested maximum frame 
size, in octets; zero for unlimited
+                * @return the Builder
+                */
+               public Builder setRequestedFrameMax(int requestedFrameMax) {
+                       this.requestedFrameMax = requestedFrameMax;
+                       return this;
+               }
+
+               /**
+                * Set the requested maximum channel number
+                * @param requestedChannelMax initially requested maximum 
channel number; zero for unlimited
+                */
+               public Builder setRequestedChannelMax(int requestedChannelMax) {
+                       this.requestedChannelMax = requestedChannelMax;
+                       return this;
+               }
+
+               /**
+                * Sets connection recovery interval. Default is 5000.
+                * @param networkRecoveryInterval how long will automatic 
recovery wait before attempting to reconnect, in ms
+                * @return the Builder
+                */
+               public Builder setNetworkRecoveryInterval(int 
networkRecoveryInterval) {
+                       this.networkRecoveryInterval = networkRecoveryInterval;
+                       return this;
+               }
+
+               /**
+                * Set the connection timeout.
+                * @param connectionTimeout connection establishment timeout in 
milliseconds; zero for infinite
+                * @return the Builder
+                */
+               public Builder setConnectionTimeout(int connectionTimeout) {
+                       this.connectionTimeout = connectionTimeout;
+                       return this;
+               }
+
+               /**
+                * Enables or disables automatic connection recovery
+                * @param automaticRecovery if true, enables connection recovery
+                * @return the Builder
+                */
+               public Builder setAutomaticRecovery(boolean automaticRecovery) {
+                       this.automaticRecovery = automaticRecovery;
+                       return this;
+               }
+
+               /**
+                * The Builder method
+                * If URI is NULL we use host, port, vHost, username, password 
combination
+                * to initialize connection. using  {@link 
RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
+                * Integer, Boolean, Boolean, Integer, Integer, Integer, 
Integer)}
+                *
+                * else URI will be used to initialize the client connection
+                * {@link RMQConnectionConfig#RMQConnectionConfig(String, 
Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+                * @return RMQConnectionConfig
+         */
+               public RMQConnectionConfig build(){
+                       if(this.uri != null) {
+                               return new RMQConnectionConfig(this.uri, 
this.networkRecoveryInterval,
+                                       this.automaticRecovery, 
this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
+                                       this.requestedFrameMax, 
this.requestedHeartbeat);
+                       } else {
+                               return new RMQConnectionConfig(this.host, 
this.port, this.virtualHost, this.username, this.password,
+                                       this.networkRecoveryInterval, 
this.automaticRecovery, this.topologyRecovery,
+                                       this.connectionTimeout, 
this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 21f185f..31128a9 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.state.SerializedCheckpointData;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
 import org.junit.Before;
@@ -220,11 +221,12 @@ public class RMQSourceTest {
         * Tests whether constructor params are passed correctly.
         */
        @Test
-       public void testConstructorParams() {
+       public void testConstructorParams() throws Exception {
                // verify construction params
+               RMQConnectionConfig.Builder builder = new 
RMQConnectionConfig.Builder();
+               
builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
                ConstructorTestClass testObj = new ConstructorTestClass(
-                       "hostTest", 999, "userTest", "passTest",
-                       "queueTest", false, new StringDeserializationScheme());
+                       builder.build(), "queueTest", false, new 
StringDeserializationScheme());
 
                try {
                        testObj.open(new Configuration());
@@ -240,17 +242,16 @@ public class RMQSourceTest {
 
        private static class ConstructorTestClass extends RMQSource<String> {
 
-               private ConnectionFactory factory = Mockito.spy(new 
ConnectionFactory());
-
-               public ConstructorTestClass(String hostName, Integer port,
-                               String username,
-                               String password,
-                               String queueName,
-                               boolean usesCorrelationId,
-                               DeserializationSchema<String> 
deserializationSchema) {
-                       super(hostName, port, username, password,
-                               queueName, usesCorrelationId, 
deserializationSchema);
+               private ConnectionFactory factory;
 
+               public ConstructorTestClass(RMQConnectionConfig 
rmqConnectionConfig,
+                                                                       String 
queueName,
+                                                                       boolean 
usesCorrelationId,
+                                                                       
DeserializationSchema<String> deserializationSchema) throws Exception {
+                       super(rmqConnectionConfig, queueName, 
usesCorrelationId, deserializationSchema);
+                       RMQConnectionConfig.Builder builder = new 
RMQConnectionConfig.Builder();
+                       
builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+                       factory = 
Mockito.spy(builder.build().getConnectionFactory());
                        try {
                                Mockito.doThrow(new 
RuntimeException()).when(factory).newConnection();
                        } catch (IOException e) {
@@ -295,7 +296,9 @@ public class RMQSourceTest {
        private class RMQTestSource extends RMQSource<String> {
 
                public RMQTestSource() {
-                       super("hostDummy", -1, "", "", "queueDummy", true, new 
StringDeserializationScheme());
+                       super(new 
RMQConnectionConfig.Builder().setHost("hostTest")
+                                       
.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
+                               , "queueDummy", true, new 
StringDeserializationScheme());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
new file mode 100644
index 0000000..40985ce
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class RMQConnectionConfigTest {
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfHostIsNull() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setPort(1000).setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/").build();
+               connectionConfig.getConnectionFactory();
+       }
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfPortIsNull() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setHost("localhost").setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/").build();
+               connectionConfig.getConnectionFactory();
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setHost("localhost").setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/").build();
+               ConnectionFactory factory = 
connectionConfig.getConnectionFactory();
+               assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, 
factory.getConnectionTimeout());
+       }
+
+       @Test
+       public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setHost("localhost").setPort(5000).setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/")
+                       .setConnectionTimeout(5000).build();
+               ConnectionFactory factory = 
connectionConfig.getConnectionFactory();
+               assertEquals(5000, factory.getConnectionTimeout());
+       }
+}

Reply via email to