morazow commented on code in PR #29:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/29#discussion_r1601778225


##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,465 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** Configuration class for RabbitMQ connections. */
+@PublicEvolving
+public class RabbitMQConnectionConfig implements Serializable {
+    private static final long DEFAULT_DELIVERY_TIMEOUT = 30000;
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;

Review Comment:
   Change these (Integer, Boolean) aslo to primitive types?
   Similarly the setters are all in the builder so I'd suggest to make them 
final



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,465 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** Configuration class for RabbitMQ connections. */
+@PublicEvolving
+public class RabbitMQConnectionConfig implements Serializable {
+    private static final long DEFAULT_DELIVERY_TIMEOUT = 30000;
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;
+
+    private Integer prefetchCount;
+    private final long deliveryTimeout;
+
+    protected RabbitMQConnectionConfig(RabbitMQConnectionConfig.Builder 
builder) {
+        Preconditions.checkArgument(
+                builder.uri != null || (builder.host != null && builder.port 
!= null),
+                "Either URI or host/port must be set");
+        if (builder.uri == null) {
+            Preconditions.checkNotNull(
+                    builder.username, "username can not be null when host/port 
is set");
+            Preconditions.checkNotNull(
+                    builder.password, "password can not be null when host/port 
is set");
+        }
+
+        Preconditions.checkArgument(
+                builder.deliveryTimeout == null || builder.deliveryTimeout > 0,
+                "deliveryTimeout must be positive");
+        this.uri = builder.uri;
+        this.host = builder.host;
+        this.port = builder.port;
+        this.virtualHost = builder.virtualHost;
+        this.username = builder.username;
+        this.password = builder.password;
+
+        this.networkRecoveryInterval = builder.networkRecoveryInterval;
+        this.automaticRecovery = builder.automaticRecovery;
+        this.topologyRecovery = builder.topologyRecovery;
+        this.connectionTimeout = builder.connectionTimeout;
+        this.requestedChannelMax = builder.requestedChannelMax;
+        this.requestedFrameMax = builder.requestedFrameMax;
+        this.requestedHeartbeat = builder.requestedHeartbeat;
+        this.prefetchCount = builder.prefetchCount;
+        this.deliveryTimeout =
+                
Optional.ofNullable(builder.deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
+    }
+
+    /**
+     * @param uri the connection URI
+     * @param networkRecoveryInterval connection recovery interval in 
milliseconds
+     * @param automaticRecovery if automatic connection recovery
+     * @param topologyRecovery if topology recovery
+     * @param connectionTimeout connection timeout
+     * @param requestedChannelMax requested maximum channel number
+     * @param requestedFrameMax requested maximum frame size
+     * @param requestedHeartbeat requested heartbeat interval
+     * @param deliveryTimeout message delivery timeout in the queueing consumer
+     * @throws NullPointerException if URI is null
+     */
+    protected RabbitMQConnectionConfig(
+            String uri,
+            Integer networkRecoveryInterval,
+            Boolean automaticRecovery,
+            Boolean topologyRecovery,
+            Integer connectionTimeout,
+            Integer requestedChannelMax,
+            Integer requestedFrameMax,
+            Integer requestedHeartbeat,
+            Integer prefetchCount,
+            Long deliveryTimeout) {
+        Preconditions.checkNotNull(uri, "Uri can not be null");
+        Preconditions.checkArgument(
+                deliveryTimeout == null || deliveryTimeout > 0, 
"deliveryTimeout must be positive");
+        this.uri = uri;
+
+        this.networkRecoveryInterval = networkRecoveryInterval;
+        this.automaticRecovery = automaticRecovery;
+        this.topologyRecovery = topologyRecovery;
+        this.connectionTimeout = connectionTimeout;
+        this.requestedChannelMax = requestedChannelMax;
+        this.requestedFrameMax = requestedFrameMax;
+        this.requestedHeartbeat = requestedHeartbeat;
+        this.prefetchCount = prefetchCount;
+        this.deliveryTimeout =
+                
Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
+    }
+
+    /** @return the host to use for connections */
+    public String getHost() {
+        return host;
+    }
+
+    /** @return the port to use for connections */
+    public int getPort() {
+        return port;

Review Comment:
   This could be null if the `URI` is set?



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionSetupException.java:
##########
@@ -0,0 +1,13 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+
+/** Exception for RabbitMQ connection setup errors. */
+@PublicEvolving

Review Comment:
   Do we need this annotation?



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java:
##########
@@ -0,0 +1,260 @@
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessage;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConfirmCallback;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link SinkWriter} to produce data into RabbitMQ. The sink uses the {@link
+ * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link
+ * RabbitMQMessageConverter} to convert the input data to {@link
+ * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link
+ * com.rabbitmq.client.Channel} to send messages to the specified queue.
+ *
+ * <p>The sink writer is stateless and blocks for new writes if the number of 
inflight messages
+ * exceeds the maximum number of inflight messages. The sink writer also 
blocks for inflight

Review Comment:
   ```suggestion
    * exceeds the maximum number of inflight messages parameter. The sink 
writer also blocks for inflight
   ```



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,465 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** Configuration class for RabbitMQ connections. */
+@PublicEvolving
+public class RabbitMQConnectionConfig implements Serializable {
+    private static final long DEFAULT_DELIVERY_TIMEOUT = 30000;
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;

Review Comment:
   Reading further I understood the reasoning is they can be null, but still I 
would argue we should have nice defaults for them



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/Constants.java:
##########
@@ -0,0 +1,13 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Constants for the RabbitMQ connector. */
+@PublicEvolving
+public class Constants {

Review Comment:
   It'd good to add Javadoc to these values



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,465 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** Configuration class for RabbitMQ connections. */
+@PublicEvolving
+public class RabbitMQConnectionConfig implements Serializable {
+    private static final long DEFAULT_DELIVERY_TIMEOUT = 30000;
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;
+
+    private Integer prefetchCount;
+    private final long deliveryTimeout;
+
+    protected RabbitMQConnectionConfig(RabbitMQConnectionConfig.Builder 
builder) {
+        Preconditions.checkArgument(
+                builder.uri != null || (builder.host != null && builder.port 
!= null),
+                "Either URI or host/port must be set");
+        if (builder.uri == null) {
+            Preconditions.checkNotNull(
+                    builder.username, "username can not be null when host/port 
is set");
+            Preconditions.checkNotNull(
+                    builder.password, "password can not be null when host/port 
is set");
+        }
+
+        Preconditions.checkArgument(
+                builder.deliveryTimeout == null || builder.deliveryTimeout > 0,
+                "deliveryTimeout must be positive");
+        this.uri = builder.uri;
+        this.host = builder.host;
+        this.port = builder.port;
+        this.virtualHost = builder.virtualHost;
+        this.username = builder.username;
+        this.password = builder.password;
+
+        this.networkRecoveryInterval = builder.networkRecoveryInterval;
+        this.automaticRecovery = builder.automaticRecovery;
+        this.topologyRecovery = builder.topologyRecovery;
+        this.connectionTimeout = builder.connectionTimeout;
+        this.requestedChannelMax = builder.requestedChannelMax;
+        this.requestedFrameMax = builder.requestedFrameMax;
+        this.requestedHeartbeat = builder.requestedHeartbeat;
+        this.prefetchCount = builder.prefetchCount;
+        this.deliveryTimeout =
+                
Optional.ofNullable(builder.deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
+    }
+
+    /**
+     * @param uri the connection URI
+     * @param networkRecoveryInterval connection recovery interval in 
milliseconds
+     * @param automaticRecovery if automatic connection recovery
+     * @param topologyRecovery if topology recovery
+     * @param connectionTimeout connection timeout
+     * @param requestedChannelMax requested maximum channel number
+     * @param requestedFrameMax requested maximum frame size
+     * @param requestedHeartbeat requested heartbeat interval
+     * @param deliveryTimeout message delivery timeout in the queueing consumer
+     * @throws NullPointerException if URI is null
+     */
+    protected RabbitMQConnectionConfig(
+            String uri,
+            Integer networkRecoveryInterval,
+            Boolean automaticRecovery,
+            Boolean topologyRecovery,
+            Integer connectionTimeout,
+            Integer requestedChannelMax,
+            Integer requestedFrameMax,
+            Integer requestedHeartbeat,
+            Integer prefetchCount,
+            Long deliveryTimeout) {
+        Preconditions.checkNotNull(uri, "Uri can not be null");
+        Preconditions.checkArgument(
+                deliveryTimeout == null || deliveryTimeout > 0, 
"deliveryTimeout must be positive");
+        this.uri = uri;
+
+        this.networkRecoveryInterval = networkRecoveryInterval;
+        this.automaticRecovery = automaticRecovery;
+        this.topologyRecovery = topologyRecovery;
+        this.connectionTimeout = connectionTimeout;
+        this.requestedChannelMax = requestedChannelMax;
+        this.requestedFrameMax = requestedFrameMax;
+        this.requestedHeartbeat = requestedHeartbeat;
+        this.prefetchCount = prefetchCount;
+        this.deliveryTimeout =
+                
Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
+    }
+
+    /** @return the host to use for connections */
+    public String getHost() {
+        return host;
+    }
+
+    /** @return the port to use for connections */
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Retrieve the virtual host.
+     *
+     * @return the virtual host to use when connecting to the broker
+     */
+    public String getVirtualHost() {
+        return virtualHost;
+    }
+
+    /**
+     * Retrieve the user name.
+     *
+     * @return the AMQP user name to use when connecting to the broker
+     */
+    public String getUsername() {
+        return username;
+    }
+
+    /**
+     * Retrieve the password.
+     *
+     * @return the password to use when connecting to the broker
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Retrieve the URI.
+     *
+     * @return the connection URI when connecting to the broker
+     */
+    public String getUri() {
+        return uri;
+    }
+
+    /**
+     * Returns automatic connection recovery interval in milliseconds.
+     *
+     * @return how long will automatic recovery wait before attempting to 
reconnect, in ms; default
+     *     is 5000
+     */
+    public Integer getNetworkRecoveryInterval() {
+        return networkRecoveryInterval;
+    }
+
+    /**
+     * Returns true if automatic connection recovery is enabled, false 
otherwise.
+     *
+     * @return true if automatic connection recovery is enabled, false 
otherwise
+     */
+    public Boolean isAutomaticRecovery() {
+        return automaticRecovery;
+    }
+
+    /**
+     * Returns true if topology recovery is enabled, false otherwise.
+     *
+     * @return true if topology recovery is enabled, false otherwise
+     */
+    public Boolean isTopologyRecovery() {
+        return topologyRecovery;
+    }
+
+    /**
+     * Retrieve the connection timeout.
+     *
+     * @return the connection timeout, in milliseconds; zero for infinite
+     */
+    public Integer getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * Retrieve the requested maximum channel number.
+     *
+     * @return the initially requested maximum channel number; zero for 
unlimited
+     */
+    public Integer getRequestedChannelMax() {
+        return requestedChannelMax;
+    }
+
+    /**
+     * Retrieve the requested maximum frame size.
+     *
+     * @return the initially requested maximum frame size, in octets; zero for 
unlimited
+     */
+    public Integer getRequestedFrameMax() {
+        return requestedFrameMax;
+    }
+
+    /**
+     * Retrieve the requested heartbeat interval.
+     *
+     * @return the initially requested heartbeat interval, in seconds; zero 
for none
+     */
+    public Integer getRequestedHeartbeat() {
+        return requestedHeartbeat;
+    }
+
+    /**
+     * Retrieve the channel prefetch count.
+     *
+     * @return an Optional of the prefetch count, if set, for the consumer 
channel
+     */
+    public Optional<Integer> getPrefetchCount() {
+        return Optional.ofNullable(prefetchCount);
+    }
+
+    /**
+     * Retrieve the message delivery timeout used in the queueing consumer. If 
not specified
+     * explicitly, the default value of 30000 milliseconds will be returned.
+     *
+     * @return the message delivery timeout, in milliseconds
+     */
+    public long getDeliveryTimeout() {
+        return deliveryTimeout;
+    }
+
+    public static Builder<RabbitMQConnectionConfig.Builder<?>> builder() {
+        return new Builder<>();
+    }
+
+    /** The Builder Class for {@link RMQConnectionConfig}. */
+    public static class Builder<T extends RabbitMQConnectionConfig.Builder<?>> 
{
+
+        private String host;
+        private Integer port;
+        private String virtualHost;
+        private String username;
+        private String password;
+
+        private Integer networkRecoveryInterval;
+        private Boolean automaticRecovery;
+        private Boolean topologyRecovery;
+
+        private Integer connectionTimeout;
+        private Integer requestedChannelMax;
+        private Integer requestedFrameMax;
+        private Integer requestedHeartbeat;
+
+        // basicQos options for consumers
+        private Integer prefetchCount;
+
+        private Long deliveryTimeout;
+
+        private String uri;
+
+        /**
+         * Set the target port.
+         *
+         * @param port the default port to use for connections
+         * @return the Builder
+         */
+        public T setPort(int port) {
+            this.port = port;
+            return (T) this;
+        }
+
+        /**
+         * @param host the default host to use for connections
+         * @return the Builder
+         */
+        public T setHost(String host) {
+            this.host = host;
+            return (T) this;
+        }
+
+        /**
+         * Set the virtual host.
+         *
+         * @param virtualHost the virtual host to use when connecting to the 
broker
+         * @return the Builder
+         */
+        public T setVirtualHost(String virtualHost) {
+            this.virtualHost = virtualHost;
+            return (T) this;
+        }
+
+        /**
+         * Set the user name.
+         *
+         * @param username the AMQP user name to use when connecting to the 
broker
+         * @return the Builder
+         */
+        public T setUserName(String username) {
+            this.username = username;
+            return (T) this;
+        }
+
+        /**
+         * Set the password.
+         *
+         * @param password the password to use when connecting to the broker
+         * @return the Builder
+         */
+        public T setPassword(String password) {
+            this.password = password;
+            return (T) this;
+        }
+
+        /**
+         * Convenience method for setting the fields in an AMQP URI: host, 
port, username, password
+         * and virtual host. If any part of the URI is omitted, the 
ConnectionFactory's

Review Comment:
   Add `{@link }` to ConnectionFactory



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java:
##########
@@ -0,0 +1,260 @@
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessage;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConfirmCallback;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link SinkWriter} to produce data into RabbitMQ. The sink uses the {@link
+ * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link
+ * RabbitMQMessageConverter} to convert the input data to {@link
+ * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link
+ * com.rabbitmq.client.Channel} to send messages to the specified queue.
+ *
+ * <p>The sink writer is stateless and blocks for new writes if the number of 
inflight messages
+ * exceeds the maximum number of inflight messages. The sink writer also 
blocks for inflight
+ * messages before taking snapshots.
+ *
+ * @param <T> input type for the sink.
+ */
+@Internal
+public class RabbitMQSinkWriter<T> implements SinkWriter<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriter.class);
+
+    /** The name of the queue to send messages to. */
+    private final String queueName;
+
+    /**
+     * The message converter to convert the input data to {@link 
RabbitMQMessage}.
+     *
+     * <p>The MessageConverter also is responsible of defining the routing 
schema of message
+     * publishing, by implementing the {@link 
RabbitMQMessageConverter#supportsExchangeRouting()}
+     * method to signal if the converter supports exchange routing.
+     */
+    private final RabbitMQMessageConverter<T> messageConverter;
+
+    private final SerializationSchema<T> serializationSchema;
+
+    private final SerializableReturnListener returnListener;
+
+    /* Counter for number of bytes this sink has attempted to send to the 
destination. */
+    private final Counter numBytesOutCounter;
+
+    /* Counter for number of records this sink has attempted to send to the 
destination. */
+    private final Counter numRecordsOutCounter;
+
+    private final MailboxExecutor mailboxExecutor;
+
+    /**
+     * The maximum number of inflight messages. The sink writer blocks for new 
writes if the number
+     * of inflight messages exceeds this value.
+     */
+    private final int maximumInflightMessages;
+
+    /* Flag to indicate if the sink should fail on error. */
+    private final boolean failOnError;
+
+    /**
+     * Map to hold inflightMessages, the {@code getSuccessConfirmCallback} and 
{@code
+     * getFailureConfirmCallback} are triggered using sequence numbers hence 
we keep the mapping of
+     * sequence number to the message. We are using a sorted map to evict all 
inflight requests with
+     * sequence number less than or equal to the sequence number of the 
message that was
+     * acknowledged if flagged to acknowledge all previous messages.
+     */
+    private final TreeMap<Long, RabbitMQMessage<T>> inflightMessages;
+
+    private Connection connection;
+    private Channel channel;
+
+    public RabbitMQSinkWriter(
+            WriterInitContext context,
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            RabbitMQMessageConverter<T> messageConverter,
+            SerializationSchema<T> serializationSchema,
+            SerializableReturnListener returnListener,
+            int maximumInflightMessages,
+            boolean failOnError)
+            throws IOException {
+        this(
+                context,
+                queueName,
+                messageConverter,
+                serializationSchema,
+                returnListener,
+                maximumInflightMessages,
+                failOnError);
+        Preconditions.checkNotNull(connectionConfig, "connectionConfig cannot 
be null");
+        try {
+            Connection connection =
+                    
RabbitMQConnectionUtil.getConnectionFactory(connectionConfig).newConnection();
+            initializeConnection(connection);
+        } catch (TimeoutException e) {
+            throw new IOException("Failed to create connection", e);
+        }
+    }
+
+    @VisibleForTesting
+    RabbitMQSinkWriter(
+            WriterInitContext context,
+            String queueName,
+            RabbitMQMessageConverter<T> messageConverter,
+            SerializationSchema<T> serializationSchema,
+            SerializableReturnListener returnListener,
+            int maximumInflightMessages,
+            boolean failOnError) {
+        Preconditions.checkNotNull(context, "context cannot be null");
+
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.maximumInflightMessages = maximumInflightMessages;
+        this.failOnError = failOnError;
+
+        SinkWriterMetricGroup metricGroup = context.metricGroup();
+        this.queueName = queueName;
+        this.messageConverter = messageConverter;
+        this.serializationSchema = serializationSchema;
+        this.returnListener = returnListener;
+
+        this.numBytesOutCounter = 
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+        this.numRecordsOutCounter = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.inflightMessages = new TreeMap<>();
+    }
+
+    @VisibleForTesting
+    void initializeConnection(Connection connection) throws IOException {
+        this.connection = connection;
+        this.channel = connection.createChannel();
+        channel.addReturnListener(returnListener);
+        channel.addConfirmListener(getSuccessConfirmCallback(), 
getFailureConfirmCallback());
+        channel.confirmSelect();
+        channel.queueDeclare(queueName, true, false, false, null);
+    }
+
+    @Override
+    public void write(T t, Context context) throws IOException, 
InterruptedException {
+        awaitInflightMessagesBelow(maximumInflightMessages);
+        RabbitMQMessage<T> recordMessage = 
messageConverter.toRabbitMQMessage(t);
+        publishMessage(recordMessage);
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+        awaitInflightMessagesBelow(1);
+    }
+
+    @Override
+    public void close() throws Exception {
+        awaitInflightMessagesBelow(1);
+        channel.close();
+        connection.close();
+    }
+
+    private void awaitInflightMessagesBelow(int maximumInflightMessages)
+            throws InterruptedException {
+        while (inflightMessages.size() >= maximumInflightMessages) {
+            mailboxExecutor.yield();
+        }
+    }
+
+    private void publishMessage(RabbitMQMessage<T> recordMessage) throws 
IOException {
+        byte[] message = 
serializationSchema.serialize(recordMessage.getMessage());
+        Long sequenceNumber = channel.getNextPublishSeqNo();
+
+        if (messageConverter.supportsExchangeRouting()) {
+            Preconditions.checkArgument(
+                    returnListener != null
+                            || !(recordMessage.isImmediate() || 
recordMessage.isMandatory()),
+                    "Return listener must be set if immediate or mandatory 
delivery is requested");
+
+            channel.basicPublish(
+                    recordMessage.getExchange(),
+                    recordMessage.getRoutingKey(),
+                    recordMessage.isMandatory(),
+                    recordMessage.isImmediate(),
+                    recordMessage.getMessageProperties(),
+                    message);
+
+        } else {
+            channel.basicPublish(
+                    recordMessage.getExchange(),
+                    queueName,
+                    recordMessage.getMessageProperties(),
+                    message);
+        }
+
+        inflightMessages.put(sequenceNumber, recordMessage);
+    }
+
+    private ConfirmCallback getSuccessConfirmCallback() {
+        return (seqNo, acknowledgePrevious) ->
+                mailboxExecutor.execute(
+                        () -> {
+                            if (acknowledgePrevious) {
+                                LOG.debug(
+                                        "Acknowledge all messages with 
sequence number less than or equal to {}",
+                                        seqNo);
+                                while (!inflightMessages.isEmpty()
+                                        && inflightMessages.firstKey() <= 
(Long) seqNo) {
+                                    RabbitMQMessage<T> message =
+                                            
inflightMessages.remove(inflightMessages.firstKey());
+                                    numBytesOutCounter.inc(
+                                            
serializationSchema.serialize(message.getMessage())
+                                                    .length);
+                                    numRecordsOutCounter.inc();
+                                }
+                            } else {
+                                LOG.debug("Acknowledge message with sequence 
number {}", seqNo);
+                                RabbitMQMessage<T> message = 
inflightMessages.remove(seqNo);
+                                numBytesOutCounter.inc(
+                                        
serializationSchema.serialize(message.getMessage()).length);
+                                numRecordsOutCounter.inc();
+                            }
+                        },
+                        "Acknowledge message with sequence number " + seqNo);
+    }
+
+    private ConfirmCallback getFailureConfirmCallback() {
+        return (seqNo, acknowledgePrevious) ->
+                mailboxExecutor.execute(
+                        () -> {
+                            if (failOnError) {
+                                LOG.error(
+                                        "Failed to send message with sequence 
number {} and payload {}",
+                                        seqNo,
+                                        
inflightMessages.get(seqNo).getMessage());
+                                throw new FlinkRuntimeException(
+                                        String.format(
+                                                "Failed to send message with 
sequence number %d and payload %s",
+                                                seqNo, 
inflightMessages.get(seqNo).getMessage()));
+                            }
+                            LOG.warn(
+                                    "Resending failed message with sequence 
number {} and payload {}",
+                                    seqNo,
+                                    inflightMessages.get(seqNo).getMessage());
+                            publishMessage(inflightMessages.remove(seqNo));

Review Comment:
   Similarly we can think about retry threshold



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java:
##########
@@ -0,0 +1,86 @@
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A RabbitMQ {@link Sink} to produce data into RabbitMQ. The sink uses the 
{@link
+ * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link
+ * RabbitMQMessageConverter} to convert the input data to {@link
+ * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link
+ * com.rabbitmq.client.Channel} to send messages to the specified queue.
+ *
+ * @param <T> input type for the sink.
+ */
+@PublicEvolving
+public class RabbitMQSink<T> implements Sink<T> {
+
+    private final RabbitMQConnectionConfig connectionConfig;
+
+    private final SerializationSchema<T> serializationSchema;
+
+    private final RabbitMQMessageConverter<T> messageConverter;
+
+    private final SerializableReturnListener returnListener;
+
+    private final String queueName;
+
+    private final int maximumInflightMessages;
+
+    private final boolean failOnError;
+
+    public RabbitMQSink(
+            RabbitMQConnectionConfig connectionConfig,
+            SerializationSchema<T> serializationSchema,
+            RabbitMQMessageConverter<T> messageConverter,
+            SerializableReturnListener returnListener,
+            String queueName,
+            int maximumInflightMessages,
+            boolean failOnError) {
+        Preconditions.checkNotNull(queueName, "queueName cannot be null");
+        Preconditions.checkNotNull(messageConverter, "messageConverter cannot 
be null");
+        Preconditions.checkNotNull(serializationSchema, "serializationSchema 
cannot be null");
+        Preconditions.checkNotNull(connectionConfig, "connectionConfig cannot 
be null");
+        Preconditions.checkArgument(
+                maximumInflightMessages > 0, "maximumInflightMessages must be 
greater than 0");
+
+        this.connectionConfig = connectionConfig;
+        this.serializationSchema = serializationSchema;
+        this.messageConverter = messageConverter;
+        this.returnListener = returnListener;
+        this.queueName = queueName;
+        this.maximumInflightMessages = maximumInflightMessages;
+        this.failOnError = failOnError;
+    }
+
+    @Override
+    public SinkWriter<T> createWriter(InitContext initContext) throws 
IOException {
+        throw new UnsupportedOperationException("Not Supported");

Review Comment:
   Similar to Pubsub add also descriptive deprecated message



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilder.java:
##########
@@ -0,0 +1,113 @@
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.connector.rabbitmq.common.DefaultRabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_MAX_INFLIGHT;
+
+/**
+ * A builder for creating a {@link RabbitMQSink}.
+ *
+ * <p>The builder uses the following parameters to build a {@link 
RabbitMQSink}:
+ *
+ * <ul>
+ *   <li>{@link RabbitMQConnectionConfig} for the connection to RabbitMQ.
+ *   <li>{@link SerializationSchema} for serializing the input data.
+ *   <li>{@link RabbitMQMessageConverter} for converting the input data to 
{@link
+ *       org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}.
+ *   <li>{@link SerializableReturnListener} for handling returned messages.
+ *   <li>{@code queueName} for the name of the queue to send messages to.
+ *   <li>{@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   <li>{@code failOnError} for whether to fail on an error.
+ * </ul>
+ *
+ * <p>It can be used as follows:
+ *
+ * <pre>{@code
+ * RabbitMQSink<String> rabbitMQSink = {@link 
RabbitMQSinkBuilder}.<String>builder()

Review Comment:
   Similar issue, link may not render in code



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java:
##########
@@ -0,0 +1,260 @@
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessage;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConfirmCallback;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link SinkWriter} to produce data into RabbitMQ. The sink uses the {@link
+ * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link
+ * RabbitMQMessageConverter} to convert the input data to {@link
+ * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link
+ * com.rabbitmq.client.Channel} to send messages to the specified queue.
+ *
+ * <p>The sink writer is stateless and blocks for new writes if the number of 
inflight messages
+ * exceeds the maximum number of inflight messages. The sink writer also 
blocks for inflight
+ * messages before taking snapshots.
+ *
+ * @param <T> input type for the sink.
+ */
+@Internal
+public class RabbitMQSinkWriter<T> implements SinkWriter<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriter.class);
+
+    /** The name of the queue to send messages to. */
+    private final String queueName;
+
+    /**
+     * The message converter to convert the input data to {@link 
RabbitMQMessage}.
+     *
+     * <p>The MessageConverter also is responsible of defining the routing 
schema of message
+     * publishing, by implementing the {@link 
RabbitMQMessageConverter#supportsExchangeRouting()}
+     * method to signal if the converter supports exchange routing.
+     */
+    private final RabbitMQMessageConverter<T> messageConverter;
+
+    private final SerializationSchema<T> serializationSchema;
+
+    private final SerializableReturnListener returnListener;
+
+    /* Counter for number of bytes this sink has attempted to send to the 
destination. */
+    private final Counter numBytesOutCounter;
+
+    /* Counter for number of records this sink has attempted to send to the 
destination. */
+    private final Counter numRecordsOutCounter;
+
+    private final MailboxExecutor mailboxExecutor;
+
+    /**
+     * The maximum number of inflight messages. The sink writer blocks for new 
writes if the number
+     * of inflight messages exceeds this value.
+     */
+    private final int maximumInflightMessages;
+
+    /* Flag to indicate if the sink should fail on error. */
+    private final boolean failOnError;
+
+    /**
+     * Map to hold inflightMessages, the {@code getSuccessConfirmCallback} and 
{@code
+     * getFailureConfirmCallback} are triggered using sequence numbers hence 
we keep the mapping of
+     * sequence number to the message. We are using a sorted map to evict all 
inflight requests with
+     * sequence number less than or equal to the sequence number of the 
message that was
+     * acknowledged if flagged to acknowledge all previous messages.
+     */
+    private final TreeMap<Long, RabbitMQMessage<T>> inflightMessages;
+
+    private Connection connection;
+    private Channel channel;
+
+    public RabbitMQSinkWriter(
+            WriterInitContext context,
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            RabbitMQMessageConverter<T> messageConverter,
+            SerializationSchema<T> serializationSchema,
+            SerializableReturnListener returnListener,
+            int maximumInflightMessages,
+            boolean failOnError)
+            throws IOException {
+        this(
+                context,
+                queueName,
+                messageConverter,
+                serializationSchema,
+                returnListener,
+                maximumInflightMessages,
+                failOnError);
+        Preconditions.checkNotNull(connectionConfig, "connectionConfig cannot 
be null");
+        try {
+            Connection connection =
+                    
RabbitMQConnectionUtil.getConnectionFactory(connectionConfig).newConnection();
+            initializeConnection(connection);
+        } catch (TimeoutException e) {
+            throw new IOException("Failed to create connection", e);
+        }
+    }
+
+    @VisibleForTesting
+    RabbitMQSinkWriter(
+            WriterInitContext context,
+            String queueName,
+            RabbitMQMessageConverter<T> messageConverter,
+            SerializationSchema<T> serializationSchema,
+            SerializableReturnListener returnListener,
+            int maximumInflightMessages,
+            boolean failOnError) {
+        Preconditions.checkNotNull(context, "context cannot be null");
+
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.maximumInflightMessages = maximumInflightMessages;
+        this.failOnError = failOnError;
+
+        SinkWriterMetricGroup metricGroup = context.metricGroup();
+        this.queueName = queueName;
+        this.messageConverter = messageConverter;
+        this.serializationSchema = serializationSchema;
+        this.returnListener = returnListener;
+
+        this.numBytesOutCounter = 
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+        this.numRecordsOutCounter = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.inflightMessages = new TreeMap<>();
+    }
+
+    @VisibleForTesting
+    void initializeConnection(Connection connection) throws IOException {
+        this.connection = connection;
+        this.channel = connection.createChannel();
+        channel.addReturnListener(returnListener);
+        channel.addConfirmListener(getSuccessConfirmCallback(), 
getFailureConfirmCallback());
+        channel.confirmSelect();
+        channel.queueDeclare(queueName, true, false, false, null);
+    }
+
+    @Override
+    public void write(T t, Context context) throws IOException, 
InterruptedException {
+        awaitInflightMessagesBelow(maximumInflightMessages);
+        RabbitMQMessage<T> recordMessage = 
messageConverter.toRabbitMQMessage(t);
+        publishMessage(recordMessage);
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+        awaitInflightMessagesBelow(1);
+    }
+
+    @Override
+    public void close() throws Exception {
+        awaitInflightMessagesBelow(1);
+        channel.close();
+        connection.close();
+    }
+
+    private void awaitInflightMessagesBelow(int maximumInflightMessages)
+            throws InterruptedException {
+        while (inflightMessages.size() >= maximumInflightMessages) {
+            mailboxExecutor.yield();
+        }
+    }
+
+    private void publishMessage(RabbitMQMessage<T> recordMessage) throws 
IOException {
+        byte[] message = 
serializationSchema.serialize(recordMessage.getMessage());
+        Long sequenceNumber = channel.getNextPublishSeqNo();
+
+        if (messageConverter.supportsExchangeRouting()) {
+            Preconditions.checkArgument(
+                    returnListener != null
+                            || !(recordMessage.isImmediate() || 
recordMessage.isMandatory()),
+                    "Return listener must be set if immediate or mandatory 
delivery is requested");
+
+            channel.basicPublish(
+                    recordMessage.getExchange(),
+                    recordMessage.getRoutingKey(),
+                    recordMessage.isMandatory(),
+                    recordMessage.isImmediate(),
+                    recordMessage.getMessageProperties(),
+                    message);
+
+        } else {
+            channel.basicPublish(
+                    recordMessage.getExchange(),
+                    queueName,
+                    recordMessage.getMessageProperties(),
+                    message);
+        }
+
+        inflightMessages.put(sequenceNumber, recordMessage);
+    }
+
+    private ConfirmCallback getSuccessConfirmCallback() {
+        return (seqNo, acknowledgePrevious) ->
+                mailboxExecutor.execute(
+                        () -> {
+                            if (acknowledgePrevious) {
+                                LOG.debug(
+                                        "Acknowledge all messages with 
sequence number less than or equal to {}",
+                                        seqNo);
+                                while (!inflightMessages.isEmpty()
+                                        && inflightMessages.firstKey() <= 
(Long) seqNo) {
+                                    RabbitMQMessage<T> message =
+                                            
inflightMessages.remove(inflightMessages.firstKey());
+                                    numBytesOutCounter.inc(
+                                            
serializationSchema.serialize(message.getMessage())

Review Comment:
   I would suggest to have another record for messages with size. We already 
serialize the message on publish, so that we can avoid serialization again here 
when updating counters



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessage.java:
##########
@@ -0,0 +1,125 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * A message to be sent to RabbitMQ with publish options.
+ *
+ * @param <T> type of the message to be sent
+ */
+@PublicEvolving
+public class RabbitMQMessage<T> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final T message;
+
+    private final boolean isMandatory;
+
+    private final boolean isImmediate;
+
+    @Nullable private final String routingKey;
+
+    @Nullable private final String exchange;
+
+    private final BasicProperties messageProperties;
+
+    public RabbitMQMessage(
+            T message,
+            @Nullable String routingKey,
+            @Nullable String exchange,
+            BasicProperties messageProperties,
+            boolean isMandatory,
+            boolean isImmediate) {
+        this.message = message;
+        this.routingKey = routingKey;
+        this.exchange = exchange;
+        this.messageProperties = messageProperties;
+        this.isMandatory = isMandatory;
+        this.isImmediate = isImmediate;
+    }

Review Comment:
   Builder constructor is missing.
   
   



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java:
##########
@@ -0,0 +1,260 @@
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessage;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter;
+import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConfirmCallback;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link SinkWriter} to produce data into RabbitMQ. The sink uses the {@link
+ * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link
+ * RabbitMQMessageConverter} to convert the input data to {@link
+ * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link
+ * com.rabbitmq.client.Channel} to send messages to the specified queue.
+ *
+ * <p>The sink writer is stateless and blocks for new writes if the number of 
inflight messages
+ * exceeds the maximum number of inflight messages. The sink writer also 
blocks for inflight
+ * messages before taking snapshots.
+ *
+ * @param <T> input type for the sink.
+ */
+@Internal
+public class RabbitMQSinkWriter<T> implements SinkWriter<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriter.class);
+
+    /** The name of the queue to send messages to. */
+    private final String queueName;
+
+    /**
+     * The message converter to convert the input data to {@link 
RabbitMQMessage}.
+     *
+     * <p>The MessageConverter also is responsible of defining the routing 
schema of message
+     * publishing, by implementing the {@link 
RabbitMQMessageConverter#supportsExchangeRouting()}
+     * method to signal if the converter supports exchange routing.
+     */
+    private final RabbitMQMessageConverter<T> messageConverter;
+
+    private final SerializationSchema<T> serializationSchema;
+
+    private final SerializableReturnListener returnListener;
+
+    /* Counter for number of bytes this sink has attempted to send to the 
destination. */
+    private final Counter numBytesOutCounter;
+
+    /* Counter for number of records this sink has attempted to send to the 
destination. */
+    private final Counter numRecordsOutCounter;
+
+    private final MailboxExecutor mailboxExecutor;
+
+    /**
+     * The maximum number of inflight messages. The sink writer blocks for new 
writes if the number
+     * of inflight messages exceeds this value.
+     */
+    private final int maximumInflightMessages;
+
+    /* Flag to indicate if the sink should fail on error. */
+    private final boolean failOnError;
+
+    /**
+     * Map to hold inflightMessages, the {@code getSuccessConfirmCallback} and 
{@code
+     * getFailureConfirmCallback} are triggered using sequence numbers hence 
we keep the mapping of
+     * sequence number to the message. We are using a sorted map to evict all 
inflight requests with
+     * sequence number less than or equal to the sequence number of the 
message that was
+     * acknowledged if flagged to acknowledge all previous messages.
+     */

Review Comment:
   👍 



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,465 @@
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** Configuration class for RabbitMQ connections. */
+@PublicEvolving
+public class RabbitMQConnectionConfig implements Serializable {
+    private static final long DEFAULT_DELIVERY_TIMEOUT = 30000;
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;

Review Comment:
   Javadocs mentions defaults already, let set them if they are null



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java:
##########
@@ -41,6 +41,7 @@
  *
  * @param <IN>
  */
+@Deprecated
 public class RMQSink<IN> extends RichSinkFunction<IN> {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to