Github user pduveau commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5410#discussion_r193356675
  
    --- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
    @@ -124,7 +159,82 @@ public void closeAllResources() throws Exception {
                verify(connection).close();
        }
     
    +   @Test
    +   public void invokeFeaturedPublishBytesToQueue() throws Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeatured();
    +
    +           rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
    +           verify(serializationSchema).serialize(MESSAGE_STR);
    +           verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, 
false,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +   }
    +
    +   @Test
    +   public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws 
Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeaturedReturnHandler();
    +
    +           rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
    +           verify(serializationSchema).serialize(MESSAGE_STR);
    +           verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +   }
    +
    +   @Test(expected = RuntimeException.class)
    +   public void exceptionDuringFeaturedPublishingIsNotIgnored() throws 
Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeatured();
    +
    +           doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +           rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
    +   }
    +
    +   @Test
    +   public void 
exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
    +           RMQSink<String> rmqSink = createRMQSinkFeatured();
    +           rmqSink.setLogFailuresOnly(true);
    +
    +           doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
    +                           publishOptions.computeProperties(""), MESSAGE);
    +           rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
    +   }
    +
    +   private class DummyPublishOptions implements 
RMQSinkPublishOptions<String> {
    --- End diff --
    
    Done


---

Reply via email to