[FLINK-3763] RabbitMQ Source/Sink standardize connection parameters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86a80336 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86a80336 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86a80336 Branch: refs/heads/master Commit: 86a803366e1371ad5af539b74565e7b8ea102580 Parents: 776253c Author: subhankar <subhankar.bis...@target.com> Authored: Tue May 31 16:08:27 2016 +0530 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu Jun 9 15:16:32 2016 +0200 ---------------------------------------------------------------------- docs/apis/streaming/connectors/rabbitmq.md | 28 +- .../streaming/connectors/rabbitmq/RMQSink.java | 55 ++- .../connectors/rabbitmq/RMQSource.java | 91 +--- .../rabbitmq/common/RMQConnectionConfig.java | 448 +++++++++++++++++++ .../connectors/rabbitmq/RMQSourceTest.java | 31 +- .../common/RMQConnectionConfigTest.java | 69 +++ 6 files changed, 598 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/docs/apis/streaming/connectors/rabbitmq.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/rabbitmq.md b/docs/apis/streaming/connectors/rabbitmq.md index b48608d..df8cf80 100644 --- a/docs/apis/streaming/connectors/rabbitmq.md +++ b/docs/apis/streaming/connectors/rabbitmq.md @@ -47,7 +47,7 @@ A class which provides an interface for receiving data from RabbitMQ. The followings have to be provided for the `RMQSource(â¦)` constructor in order: -- hostName: The RabbitMQ broker hostname. +- RMQConnectionConfig. - queueName: The RabbitMQ queue name. - usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`). - deserializationSchema: Deserialization schema to turn messages into Java objects. @@ -71,23 +71,29 @@ Example: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} +RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build(); DataStream<String> streamWithoutCorrelationIds = env - .addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema())) + .addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema())) .print DataStream<String> streamWithCorrelationIds = env - .addSource(new RMQSource<String>("localhost", "hello", true, new SimpleStringSchema())) + .addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema())) .print {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} +val connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build() streamWithoutCorrelationIds = env - .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema)) + .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema)) .print streamWithCorrelationIds = env - .addSource(new RMQSource[String]("localhost", "hello", true, new SimpleStringSchema)) + .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema)) .print {% endhighlight %} </div> @@ -98,7 +104,7 @@ A class providing an interface for sending data to RabbitMQ. The followings have to be provided for the `RMQSink(â¦)` constructor in order: -1. The hostname +1. RMQConnectionConfig 2. The queue name 3. Serialization schema @@ -107,12 +113,18 @@ Example: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer())); +RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build(); +stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer())); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer)) +val connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build() +stream.addSink(new RMQSink[String](connectionConfig, "hello", new StringToByteSerializer)) {% endhighlight %} </div> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index ca18fc4..bf0cef7 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,38 +35,26 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - private String QUEUE_NAME; - private String HOST_NAME; - private transient ConnectionFactory factory; + private String queueName; + private RMQConnectionConfig rmqConnectionConfig; private transient Connection connection; private transient Channel channel; private SerializationSchema<IN> schema; - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN> schema) { - this.HOST_NAME = HOST_NAME; - this.QUEUE_NAME = QUEUE_NAME; - this.schema = schema; - } - /** - * Initializes the connection to RMQ. - */ - public void initializeConnection() { - factory = new ConnectionFactory(); - factory.setHost(HOST_NAME); - try { - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - - } catch (IOException e) { - throw new RuntimeException(e); - } + * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. + * @param queueName The queue to publish messages to. + * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + */ + public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) { + this.rmqConnectionConfig = rmqConnectionConfig; + this.queueName = queueName; + this.schema = schema; } /** * Called when new data arrives to the sink, and forwards it to RMQ. - * + * * @param value * The incoming data */ @@ -74,11 +63,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", QUEUE_NAME, null, msg); + channel.basicPublish("", queueName, null, msg); } catch (IOException e) { if (LOG.isErrorEnabled()) { - LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME); + LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost()); } } @@ -92,15 +81,23 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { channel.close(); connection.close(); } catch (IOException e) { - throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME - + " at " + HOST_NAME, e); + throw new RuntimeException("Error while closing RMQ connection with " + queueName + + " at " + rmqConnectionConfig.getHost(), e); } } @Override - public void open(Configuration config) { - initializeConnection(); + public void open(Configuration config) throws Exception { + ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory(); + try { + connection = factory.newConnection(); + channel = connection.createChannel(); + channel.queueDeclare(queueName, false, false, false, null); + + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 4f6a07f..8297f9c 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import com.rabbitmq.client.Channel; @@ -66,17 +67,14 @@ import org.slf4j.LoggerFactory; * @param <OUT> The type of the data read from RabbitMQ. */ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long> - implements ResultTypeQueryable<OUT> { + implements ResultTypeQueryable<OUT> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class); - private final String hostName; - private final Integer port; - private final String username; - private final String password; - protected final String queueName; + private final RMQConnectionConfig rmqConnectionConfig; + private final String queueName; private final boolean usesCorrelationId; protected DeserializationSchema<OUT> schema; @@ -92,16 +90,16 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU * Creates a new RabbitMQ source with at-least-once message processing guarantee when * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled. * For exactly-once, please use the constructor - * {@link RMQSource#RMQSource(String, String, boolean usesCorrelationId, DeserializationSchema)}, + * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean usesCorrelationId, DeserializationSchema)}, * set {@param usesCorrelationId} to true and enable checkpointing. - * @param hostName The RabbiMQ broker's address to connect to. + * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to receive messages from. * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received * into Java objects. */ - public RMQSource(String hostName, String queueName, - DeserializationSchema<OUT> deserializationSchema) { - this(hostName, null, null, null, queueName, false, deserializationSchema); + public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, + DeserializationSchema<OUT> deserializationSchema) { + this(rmqConnectionConfig, queueName, false, deserializationSchema); } /** @@ -109,7 +107,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU * at the producer. The correlation id must be unique. Otherwise the behavior of the source is * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not * used, this source has at-least-once processing semantics when checkpointing is enabled. - * @param hostName The RabbitMQ broker's address to connect to. + * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to receive messages from. * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b> * id to deduplicate messages (in case of failed acknowledgments). @@ -117,53 +115,10 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received * into Java objects. */ - public RMQSource(String hostName, String queueName, boolean usesCorrelationId, - DeserializationSchema<OUT> deserializationSchema) { - this(hostName, null, null, null, queueName, usesCorrelationId, deserializationSchema); - } - - /** - * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages - * at the producer. The correlation id must be unique. Otherwise the behavior of the source is - * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not - * used, this source has at-least-once processing semantics when checkpointing is enabled. - * @param hostName The RabbitMQ broker's address to connect to. - * @param port The RabbitMQ broker's port. - * @param queueName The queue to receive messages from. - * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b> - * id to deduplicate messages (in case of failed acknowledgments). - * Only used when checkpointing is enabled. - * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received - * into Java objects. - */ - public RMQSource(String hostName, Integer port, - String queueName, boolean usesCorrelationId, - DeserializationSchema<OUT> deserializationSchema) { - this(hostName, port, null, null, queueName, usesCorrelationId, deserializationSchema); - } - - /** - * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages - * at the producer. The correlation id must be unique. Otherwise the behavior of the source is - * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are not - * used, this source has at-least-once processing semantics when checkpointing is enabled. - * @param hostName The RabbitMQ broker's address to connect to. - * @param port The RabbitMQ broker's port. - * @param queueName The queue to receive messages from. - * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b> - * id to deduplicate messages (in case of failed acknowledgments). - * Only used when checkpointing is enabled. - * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received - * into Java objects. - */ - public RMQSource(String hostName, Integer port, String username, String password, - String queueName, boolean usesCorrelationId, - DeserializationSchema<OUT> deserializationSchema) { + public RMQSource(RMQConnectionConfig rmqConnectionConfig, + String queueName, boolean usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) { super(String.class); - this.hostName = hostName; - this.port = port; - this.username = username; - this.password = password; + this.rmqConnectionConfig = rmqConnectionConfig; this.queueName = queueName; this.usesCorrelationId = usesCorrelationId; this.schema = deserializationSchema; @@ -173,8 +128,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU * Initializes the connection to RMQ with a default connection factory. The user may override * this method to setup and configure their own ConnectionFactory. */ - protected ConnectionFactory setupConnectionFactory() { - return new ConnectionFactory(); + protected ConnectionFactory setupConnectionFactory() throws Exception { + return rmqConnectionConfig.getConnectionFactory(); } /** @@ -189,18 +144,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU /** * Initializes the connection to RMQ. */ - private void initializeConnection() { + private void initializeConnection() throws Exception { ConnectionFactory factory = setupConnectionFactory(); - factory.setHost(hostName); - if (port != null) { - factory.setPort(port); - } - if (username != null) { - factory.setUsername(username); - } - if (password != null) { - factory.setPassword(password); - } try { connection = factory.newConnection(); channel = connection.createChannel(); @@ -222,7 +167,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU } catch (IOException e) { throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at " - + hostName, e); + + rmqConnectionConfig.getHost(), e); } } @@ -240,7 +185,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU connection.close(); } catch (IOException e) { throw new RuntimeException("Error while closing RMQ connection with " + queueName - + " at " + hostName, e); + + " at " + rmqConnectionConfig.getHost(), e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java new file mode 100644 index 0000000..0ce7e79 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.rabbitmq.client.ConnectionFactory; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, + * Boolean, Boolean, Integer, Integer, Integer, Integer)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean, + * Boolean, Integer, Integer, Integer, Integer)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + /** + * + * @param host host name + * @param port port + * @param virtualHost virtual host + * @param username username + * @param password password + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password, + Integer networkRecoveryInterval, Boolean automaticRecovery, + Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, + Integer requestedFrameMax, Integer requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(port, "port can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** + * + * @param uri the connection URI + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if URI is null + */ + private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery, + Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, + Integer requestedFrameMax, Integer requestedHeartbeat){ + Preconditions.checkNotNull(uri, "Uri can not be null"); + this.uri = uri; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** @return the host to use for connections */ + public String getHost() { + return host; + } + + /** @return the port to use for connections */ + public int getPort() { + return port; + } + + /** + * Retrieve the virtual host. + * @return the virtual host to use when connecting to the broker + */ + public String getVirtualHost() { + return virtualHost; + } + + /** + * Retrieve the user name. + * @return the AMQP user name to use when connecting to the broker + */ + public String getUsername() { + return username; + } + + /** + * Retrieve the password. + * @return the password to use when connecting to the broker + */ + public String getPassword() { + return password; + } + + /** + * Retrieve the URI. + * @return the connection URI when connecting to the broker + */ + public String getUri() { + return uri; + } + + /** + * Returns automatic connection recovery interval in milliseconds. + * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000 + */ + public Integer getNetworkRecoveryInterval() { + return networkRecoveryInterval; + } + + /** + * Returns true if automatic connection recovery is enabled, false otherwise + * @return true if automatic connection recovery is enabled, false otherwise + */ + public Boolean isAutomaticRecovery() { + return automaticRecovery; + } + + /** + * Returns true if topology recovery is enabled, false otherwise + * @return true if topology recovery is enabled, false otherwise + */ + public Boolean isTopologyRecovery() { + return topologyRecovery; + } + + /** + * Retrieve the connection timeout. + * @return the connection timeout, in milliseconds; zero for infinite + */ + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Retrieve the requested maximum channel number + * @return the initially requested maximum channel number; zero for unlimited + */ + public Integer getRequestedChannelMax() { + return requestedChannelMax; + } + + /** + * Retrieve the requested maximum frame size + * @return the initially requested maximum frame size, in octets; zero for unlimited + */ + public Integer getRequestedFrameMax() { + return requestedFrameMax; + } + + /** + * Retrieve the requested heartbeat interval. + * @return the initially requested heartbeat interval, in seconds; zero for none + */ + public Integer getRequestedHeartbeat() { + return requestedHeartbeat; + } + + /** + * + * @return Connection Factory for RMQ + * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed + */ + public ConnectionFactory getConnectionFactory() throws URISyntaxException, + NoSuchAlgorithmException, KeyManagementException { + ConnectionFactory factory = new ConnectionFactory(); + if (this.uri != null && !this.uri.isEmpty()){ + try { + factory.setUri(getUri()); + }catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e){ + LOG.error("Failed to parse uri {}", e.getMessage()); + throw e; + } + } else { + factory.setHost(this.host); + factory.setPort(this.port); + factory.setVirtualHost(this.virtualHost); + factory.setUsername(this.username); + factory.setPassword(this.password); + } + + if (this.automaticRecovery != null) { + factory.setAutomaticRecoveryEnabled(this.automaticRecovery); + } + if (this.connectionTimeout != null) { + factory.setConnectionTimeout(this.connectionTimeout); + } + if (this.networkRecoveryInterval != null) { + factory.setNetworkRecoveryInterval(this.networkRecoveryInterval); + } + if (this.requestedHeartbeat != null) { + factory.setRequestedHeartbeat(this.requestedHeartbeat); + } + if (this.topologyRecovery != null) { + factory.setTopologyRecoveryEnabled(this.topologyRecovery); + } + if (this.requestedChannelMax != null) { + factory.setRequestedChannelMax(this.requestedChannelMax); + } + if (this.requestedFrameMax != null) { + factory.setRequestedFrameMax(this.requestedFrameMax); + } + + return factory; + } + + /** + * The Builder Class for {@link RMQConnectionConfig} + */ + public static class Builder { + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + private String uri; + + /** + * Set the target port. + * @param port the default port to use for connections + * @return the Builder + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** @param host the default host to use for connections + * @return the Builder + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Set the virtual host. + * @param virtualHost the virtual host to use when connecting to the broker + * @return the Builder + */ + public Builder setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + return this; + } + + /** + * Set the user name. + * @param username the AMQP user name to use when connecting to the broker + * @return the Builder + */ + public Builder setUserName(String username) { + this.username = username; + return this; + } + + /** + * Set the password. + * @param password the password to use when connecting to the broker + * @return the Builder + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Convenience method for setting the fields in an AMQP URI: host, + * port, username, password and virtual host. If any part of the + * URI is ommited, the ConnectionFactory's corresponding variable + * is left unchanged. + * @param uri is the AMQP URI containing the data + * @return the Builder + */ + public Builder setUri(String uri) { + this.uri = uri; + return this; + } + + /** + * Enables or disables topology recovery + * @param topologyRecovery if true, enables topology recovery + * @return the Builder + */ + public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) { + this.topologyRecovery = topologyRecovery; + return this; + } + + /** + * Set the requested heartbeat. + * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none + * @return the Builder + */ + public Builder setRequestedHeartbeat(int requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + return this; + } + + /** + * Set the requested maximum frame size + * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited + * @return the Builder + */ + public Builder setRequestedFrameMax(int requestedFrameMax) { + this.requestedFrameMax = requestedFrameMax; + return this; + } + + /** + * Set the requested maximum channel number + * @param requestedChannelMax initially requested maximum channel number; zero for unlimited + */ + public Builder setRequestedChannelMax(int requestedChannelMax) { + this.requestedChannelMax = requestedChannelMax; + return this; + } + + /** + * Sets connection recovery interval. Default is 5000. + * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms + * @return the Builder + */ + public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) { + this.networkRecoveryInterval = networkRecoveryInterval; + return this; + } + + /** + * Set the connection timeout. + * @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite + * @return the Builder + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Enables or disables automatic connection recovery + * @param automaticRecovery if true, enables connection recovery + * @return the Builder + */ + public Builder setAutomaticRecovery(boolean automaticRecovery) { + this.automaticRecovery = automaticRecovery; + return this; + } + + /** + * The Builder method + * If URI is NULL we use host, port, vHost, username, password combination + * to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, + * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)} + * + * else URI will be used to initialize the client connection + * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)} + * @return RMQConnectionConfig + */ + public RMQConnectionConfig build(){ + if(this.uri != null) { + return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval, + this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax, + this.requestedFrameMax, this.requestedHeartbeat); + } else { + return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password, + this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery, + this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index 21f185f..31128a9 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.SerializedCheckpointData; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.junit.After; import org.junit.Before; @@ -220,11 +221,12 @@ public class RMQSourceTest { * Tests whether constructor params are passed correctly. */ @Test - public void testConstructorParams() { + public void testConstructorParams() throws Exception { // verify construction params + RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder(); + builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/"); ConstructorTestClass testObj = new ConstructorTestClass( - "hostTest", 999, "userTest", "passTest", - "queueTest", false, new StringDeserializationScheme()); + builder.build(), "queueTest", false, new StringDeserializationScheme()); try { testObj.open(new Configuration()); @@ -240,17 +242,16 @@ public class RMQSourceTest { private static class ConstructorTestClass extends RMQSource<String> { - private ConnectionFactory factory = Mockito.spy(new ConnectionFactory()); - - public ConstructorTestClass(String hostName, Integer port, - String username, - String password, - String queueName, - boolean usesCorrelationId, - DeserializationSchema<String> deserializationSchema) { - super(hostName, port, username, password, - queueName, usesCorrelationId, deserializationSchema); + private ConnectionFactory factory; + public ConstructorTestClass(RMQConnectionConfig rmqConnectionConfig, + String queueName, + boolean usesCorrelationId, + DeserializationSchema<String> deserializationSchema) throws Exception { + super(rmqConnectionConfig, queueName, usesCorrelationId, deserializationSchema); + RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder(); + builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/"); + factory = Mockito.spy(builder.build().getConnectionFactory()); try { Mockito.doThrow(new RuntimeException()).when(factory).newConnection(); } catch (IOException e) { @@ -295,7 +296,9 @@ public class RMQSourceTest { private class RMQTestSource extends RMQSource<String> { public RMQTestSource() { - super("hostDummy", -1, "", "", "queueDummy", true, new StringDeserializationScheme()); + super(new RMQConnectionConfig.Builder().setHost("hostTest") + .setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build() + , "queueDummy", true, new StringDeserializationScheme()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/86a80336/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java new file mode 100644 index 0000000..40985ce --- /dev/null +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.rabbitmq.client.ConnectionFactory; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import static org.junit.Assert.assertEquals; + + +public class RMQConnectionConfigTest { + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfHostIsNull() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setPort(1000).setUserName("guest") + .setPassword("guest").setVirtualHost("/").build(); + connectionConfig.getConnectionFactory(); + } + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost").setUserName("guest") + .setPassword("guest").setVirtualHost("/").build(); + connectionConfig.getConnectionFactory(); + } + + @Test(expected = NullPointerException.class) + public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost").setUserName("guest") + .setPassword("guest").setVirtualHost("/").build(); + ConnectionFactory factory = connectionConfig.getConnectionFactory(); + assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, factory.getConnectionTimeout()); + } + + @Test + public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost").setPort(5000).setUserName("guest") + .setPassword("guest").setVirtualHost("/") + .setConnectionTimeout(5000).build(); + ConnectionFactory factory = connectionConfig.getConnectionFactory(); + assertEquals(5000, factory.getConnectionTimeout()); + } +}