[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503067#comment-16503067
 ] 

ASF GitHub Bot commented on FLINK-8468:
---------------------------------------

Github user pduveau commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5410#discussion_r193356675
  
    --- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
    @@ -124,7 +159,82 @@ public void closeAllResources() throws Exception {
                verify(connection).close();
        }
     
    +   @Test
    +   public void invokeFeaturedPublishBytesToQueue() throws Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeatured();
    +
    +           rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
    +           verify(serializationSchema).serialize(MESSAGE_STR);
    +           verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, 
false,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +   }
    +
    +   @Test
    +   public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws 
Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeaturedReturnHandler();
    +
    +           rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
    +           verify(serializationSchema).serialize(MESSAGE_STR);
    +           verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +   }
    +
    +   @Test(expected = RuntimeException.class)
    +   public void exceptionDuringFeaturedPublishingIsNotIgnored() throws 
Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeatured();
    +
    +           doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +           rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
    +   }
    +
    +   @Test
    +   public void 
exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeatured();
    +           rmqSink.setLogFailuresOnly(true);
    +
    +           doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +           rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
    +   }
    +
    +   private class DummyPublishOptions implements 
RMQSinkPublishOptions<String> {
    --- End diff --
    
    Done


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8468
>                 URL: https://issues.apache.org/jira/browse/FLINK-8468
>             Project: Flink
>          Issue Type: Improvement
>          Components: RabbitMQ Connector
>    Affects Versions: 1.4.0
>            Reporter: Ph.Duveau
>            Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to