[
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308479#comment-15308479
]
ASF GitHub Bot commented on FLINK-3763:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2054#discussion_r65250839
--- Diff:
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int,
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String,
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+ private String host;
+ private int port;
+ private String virtualHost;
+ private String username;
+ private String password;
+ private String uri;
+
+ private int networkRecoveryInterval;
+ private boolean automaticRecovery;
+ private boolean topologyRecovery;
+
+ private int connectionTimeout;
+ private int requestedChannelMax;
+ private int requestedFrameMax;
+ private int requestedHeartbeat;
+
+ /**
+ *
+ * @param host host name
+ * @param port port
+ * @param virtualHost virtual host
+ * @param username username
+ * @param password password
+
+ * @param networkRecoveryInterval connection recovery interval in
milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if host or virtual host or username or
password is null
+ */
+ private RMQConnectionConfig(String host, int port, String virtualHost,
String username, String password,
+ int
networkRecoveryInterval, boolean automaticRecovery,
+ boolean
topologyRecovery, int connectionTimeout, int requestedChannelMax, int
requestedFrameMax,
+ int
requestedHeartbeat){
+ Preconditions.checkNotNull(host, "host can not be null");
+ Preconditions.checkNotNull(virtualHost, "virtualHost can not be
null");
+ Preconditions.checkNotNull(username, "username can not be
null");
+ Preconditions.checkNotNull(password, "password can not be
null");
+ this.host = host;
+ this.port = port;
+ this.virtualHost = virtualHost;
+ this.username = username;
+ this.password = password;
+
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ this.automaticRecovery = automaticRecovery;
+ this.topologyRecovery = topologyRecovery;
+ this.connectionTimeout = connectionTimeout;
+ this.requestedChannelMax = requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ /**
+ *
+ * @param uri the connection URI
+ * @param networkRecoveryInterval connection recovery interval in
milliseconds
+ * @param automaticRecovery if automatic connection recovery
+ * @param topologyRecovery if topology recovery
+ * @param connectionTimeout connection timeout
+ * @param requestedChannelMax requested maximum channel number
+ * @param requestedFrameMax requested maximum frame size
+ * @param requestedHeartbeat requested heartbeat interval
+ * @throws NullPointerException if URI is null
--- End diff --
It seems that the indentation here is done using tabs and spaces.
> RabbitMQ Source/Sink standardize connection parameters
> ------------------------------------------------------
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Affects Versions: 1.0.1
> Reporter: Robert Batts
> Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of
> establishing a connection, currently the sink is lacking connection
> parameters that are available on the source. Additionally, VirtualHost should
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified
> it goes to the vhost '/').
> Connection Parameters
> ===================
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because
> that would offer all 5 of the above parameters in a single String.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)